newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism. For example, to limit a number of concurrent request to the database to 10 one typically defines:
val DB = newFixedThreadPoolContext(10, "DB")
and then wraps all DB invocation into withContext(DB) { ... } blocks.
This approach have the following problems:
withContext(DB) invocation performs an actual switch to a different thread which is extremely expensive.newFixedThreadPoolContext references the underlying threads and must be explicitly closed when no longer used. This is quite error-prone as programmers may use newFixedThreadPoolContext in their code without realizing this fact, thus leaking threads.The plan is to reimplement newFixedThreadPoolContext from scratch so that it does not create any threads. Instead, there will be one shared pool of threads that creates new thread strictly when they are needed. Thus, newFixedThreadPoolContext does not create its own threads, but acts only as a semaphore that limits the number of concurrent operations running in this context.
Moreover, DefaultContext, which is currently equal to CommonPool (backed by ForkJointPool.commonPool), is going to be redefined in this way:
val DefaultContext = newFixedThreadPoolContext(defaultParallelism, "DefaultContext")
The current plan is to set
defaultParallelismtonCPUs + 1as a compromise value that ensures utilization of the underlying hardware even if one coroutine accidentally blocks and helps us avoid issue #198
Now, with this redefinition of DefaultContext the code that is used to define its own DB context continues to work as before (limiting the number of concurrent DB operations). However, both issues identified above are solved:
withContext(DB) invocation does not actually perform thread context switch anymore. It only switches coroutine context and separately keeps track of and limits the number of concurrently running coroutines in DB context.newFixedThreadPoolContext anymore, as it is not backed by any physical threads, no risk of leaking threads.This change also affects newSingleThreadContext as its implementation is:
fun newSingleThreadContext(name: String) = newFixedThreadPoolContext(1, name)
This _might_ break some code (feedback is welcome!) as there could have been some code in the wild that assumed that everything working in
newSingleThreadContextwas indeed happening in the single instance ofThreadand usedThreadLocalfor to store something, for example. The workaround for this code is to useExecutors.newSingleThreadExecutor().toCoroutineDispatcher().
This issue is related to the discussion on IO dispatcher in #79. It is inefficient to use Executors.newCachedThreadPool().toCoroutineContext() due to the thread context switches. The plan, as a part of this issue, is to define the following constant:
val IO: CoroutineContext = ...
The name is to be discussed in #79
Coroutines working in this context share the same thread pool as DefaultContext, so there is no cost of thread switch when doing withContext(IO) { ... }, but there is no inherent limit on the number of such concurrently executed operations.
Note, that we also avoid issue #216 with this rewrite.
Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?
Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code? This would work especially well if newFixedThreadPoolContext is somehow renamed (old is deprecated), but newSingleThreadContext retains the old name.
UPDATE: Due to backward-compatibility requirements the actual design will likely be different. Stay tuned.
newFixedThreadPoolContext is actively used in coroutines code as a concurrency-limiting mechanism
Why not define a JVM-like Semaphore?
fun Mutex(permits: Int = 1) : Mutex
It is also possible consider to introduce:
fun Mutex.asCoroutineDispatcher(delegated : CoroutineDispatcher = DefaultDispatcher): CoroutineDispatcher
The current plan is to set defaultParallelism to nCPUs + 1
Usually the GC pressure is enough to cover accidentally blocks.
Setting defaultParallelism=max(nCPUs, 2) should work same.
I consider a different question to create a flexible thread pool and mixing blocking and nonblocking operations in it, are you considering to benchmark a prototype?
A context-switch is costly, I agree, but this cost is lesser then an I/O operation.
Coroutines working in this context share the same common pool (so no cost of thread switch when doing withContext(IO) { ... })
ForkJoinPool uses a task queue for each thread, withContext(IO) doesn't force any context switch but locks the thread, so all other tasks are forced to switch on another thread.
Switch a task on another thread on multiprocessor (NUMA) server requires to refresh the local CPU cache (L1, L2), this is the greatest cost of the context switch.
@fvasco Unfortunately, the Mutex interface (with suspending lock) is not an appropriate abstraction to efficiently implement a coroutine dispatcher on top of it. The "semaphore" has to be an internal data structure tightly coupled with the implementation of the corresponding dispatcher.
The appropriate value of defaultParallelism is questionable, but I personally like nCPUs+1 more than max(nCPUs, 2), because the former is more regular, too.
The questions of a flexible thread-pool and blocking IO are indeed different, but it looks that they can be solved with a single implementation effort. We'll definitely start implementation with benchmarks and experiment with different strategies.
The idea is that similarly to FJP, this implementation is going to be "sticky" and will only move coroutines to another thread when absolutely necessarily. We plan to be way more lazy in this respect than FJP (it can be shown that FJP work-stealing strategy actually has adverse performance impact on a typical CSP-style code).
With respect to blocking operations it means that we'll give some time for a blocking operations to complete before moving all other coroutines to another thread. It seems to be the most efficient strategy based on our study of other languages and libraries, but we'll see how it actually works out in practice.
Hi @elizarov,
thank you for quick response, I will stay tuned for updates.
Regarding Semaphore like a dispatcher I suspected your consideration too late, sorry for this miss.
However I consider a valid option to use a Mutex to limit the concurrent access to a resource, more appropriate than using a custom CoroutineDispatcher.
Should we leave newSingleThreadContext defined as before (with all the context switch cost) to avoid potentially breaking existing code?
I say yes, may be something like:
fun newDedicatedThreadDispatcher(
threadPoolSize: Int = 1,
threadFactory: ThreadFactory? = Executors.defaultThreadFactory()
): CoroutineDispatcher
Shall we rename newFixedThreadPoolContext and newSingleThreadContext after this rewrite or leave their names as is? Can we name it better?
I believe we shall, because we can. Something like Dispatchers.fixedPool is more inline with the current naming.
Moreover, I don't think we need a separate factory method to create a single threaded pool. It seems like an obvious case of a fixed-size pool, even if some optimization is happening under the covers. Using Dispatchers.fixedPool(size = 1) should cover all the use cases.
What should I do if I need a grantee that my thread pool is never blocked (for more than 50ms)? For example I use a coroutine as a timer:
launch {
delay(1000)
println("step 1")
delay(1000)
println("step 2")
delay(1000)
println("step 3")
}
This code fails if for some reason all the threads in the pool are blocked (do CPU-bound work) and the scheduler is unable to switch this coroutine to a thread on time. Will the new pool be able to battle that?
Regarding my consideration above, now is available the issue #1088.
Hi, I saw the deprecation notice of https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/new-single-thread-context.html
Is there a recommended, current way to execute coroutines with FIFO semantics? Specifically I need to sequence access of SQLite database to occur from a single thread.
@justjake
Is there a recommended, current way to execute coroutines with FIFO semantics?
Specifically I need to sequence access of SQLite database to occur from a single thread.
You should not use the coroutine.
@fvasco thank you for your reply. However I am now more confused: why is coroutines not the right tool here? My thought was that callers that require SQLite could await sub-operations dispatched to the SQLite dispatcher that uses only a single thread. Currently in my application this uses a concrete SQLiteJob with callbacks pushed into a queue, a Java ThreadPool, etc, which is cumbersome and already needs a suspend / resume to be used from coroutines.
why is coroutines not the right tool here?
It is possible to run multiple coroutine _concurrently_ on a single thread, but you want to "sequence access of SQLite database to occur from a single thread".
which is cumbersome and already needs a suspend / resume to be used from coroutines
Put the logic in a function, example:
suspend fun <T> sqlContext(block: (Sqlite) -> T) : T =
executor.submit{ block(sqlite) }.await()
Any reason why would this executor be better than mutex?
The reason is that it does a different thing than mutex:
* There is not need to close `newFixedThreadPoolContext` anymore, as it is not backed by any physical threads, no risk of leaking threads.
I'm really looking forward to this @elizarov. Is there any ETA / milestone for that feature?
Sorry, I cannot give any specific ETA at the moment.
Related: https://discuss.kotlinlang.org/t/coroutine-dispatcher-confined-to-a-single-thread/17978
I would need newSingleThreadContext implemented exactly in this way, to pick a thread from the default thread pool.
Is there anything I could use in the meantime?
@alamothe In the forum thread you ask:
Is there such thing as a dispatcher which “picks one worker thread from a default pool and does not jump between threads”?
This is not what this proposal is supposed to do. In this proposal even with a limit of 1 thread it will still jump between threads, but it will ensure that at most 1 thread is used at any time. We don't have anything out-of-the box to cover your needs. You'll have to write your own dispatcher that supports it.
but it will ensure that at most 1 thread is used at any time.
This is actually what I need. The requirement is to prevent unintentional multi-threading (because our code is not thread-safe). My original post didn't state this in the best possible way, but later I arrived at this conclusion.
Thanks for checking!
Is there any plan to support this for Kotlin/Native?
UPDATE: Due to backward-compatibility requirements the actual design will likely be different. Stay tuned.
Most helpful comment
UPDATE: Due to backward-compatibility requirements the actual design will likely be different. Stay tuned.