'How can I create single-thread coroutine context under Common pool in Kotlin?
Short requirement: have ability to create corotine context, which will executed in the single thread only (e.g. without parallelism).
Additional requirement: it is better to use existing CommonPool (e.g. Thread Pool) for these tasks
Actually kotlin coroutines have method newSingleThreadContext
which will create separate thread and schedule all tasks into it. However, this is dedicated thread, so ~1000 such contexts will require a lot of resources.
Therefore, I'd like to have context with the following characteristics:
- Maximum one task can be executed at the same time
- This context should reuse any other (e.g. parent context). E.g context should hold no additional threads
Solution 1:[1]
Starting from the 1.6.0
version of the kotlinx.coroutines
library we can use limitedParallelism
function on the CoroutineDispatcher
object, which lets you limit parallelism without creating additional thread pools and provides a unified way to create dispatchers for unbound parallelism.
Example of usage:
class UserRepository {
private val dbDispatcher = Dispatchers.IO.limitedParallelism(1)
suspend fun getUserById(userId: Int): User? = withContext(dbDispatcher) {
executeQuery("SELECT * FROM users WHERE id = $1", userId).singleOrNull()
}
}
limitedParallelism(1)
guarantees the parallelism restriction - at most 1 coroutine can be executed concurrently in this dispatcher.
It should solve the problem:
Maximum one task can be executed at the same time.
Solution 2:[2]
Here's a solution:
When you say, for example withSerialContext(Dispatchers.Default) {doWork()}
, it executes doWork()
on a default dispatcher thread, but all of its parts will execute one at a time like they do in runBlocking{}. Note that even though it's one thread at a time, there is no guarantee that it will be the same thread for the whole operation.
suspend fun <T> withSerialContext(
context: CoroutineDispatcher,
block: suspend CoroutineScope.() -> T
): T = withContext(SerialContextDispatcher(context), block)
private class SerialContextDispatcher(private val target: CoroutineDispatcher) : CoroutineDispatcher() {
private val q = ConcurrentLinkedQueue<Runnable>()
//Whoever CASes this false->true schedules execution of runproc
private val pending = AtomicBoolean(false)
//Only one of these runs at a time
private val runproc = object: Runnable {
override fun run() {
while(true) {
val proc = q.poll();
if (proc != null) {
try {
proc.run()
}
catch (e: Throwable) {
target.dispatch(EmptyCoroutineContext, this)
throw e
}
} else {
pending.set(false);
if (q.isEmpty() || !pending.compareAndSet(false, true)) {
return
}
}
}
}
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
q.add(block)
if (pending.compareAndSet(false, true)) {
target.dispatch(EmptyCoroutineContext, runproc)
}
}
}
Solution 3:[3]
I found, that there are not simple solution to create such context.
There is open issue on githuib - https://github.com/Kotlin/kotlinx.coroutines/issues/261
I think I will update this question when I find right solution.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | |
Solution 3 |