interface TestPreconditionVoter {
fun vote(context: Map<String, Any>): Vote
}
and voters = List<TestPreconditionVoter>()
Based on a given boolean flag, I'd like to do the following:
true, execute vote parallelly.Optionally, I'd like to replace the Map<String, Any> with something from the CoroutineContext that can be used as a temporary bag of data to pass from one voter to another. Note that in the first case, such data structure would have to be thread safe.
What'd be an idiomatic way to do this? Currently I've the following WIP code but I suspect there is a better way.
runBlocking {
withTimeout(vote.timeoutMillis, TimeUnit.MILLISECONDS) {
if (vote.castingStrategy == PARALLEL) {
voters
.map { async { it.vote(context) } }
.groupBy({ it.await() }, { it.await() })
.mapValues { it.value.size }
} else {
voters
.map { it.vote(context) }
.groupingBy { it }?.eachCount()
}
}
}
Kotlin 1.2.31, Coroutine 0.22.5
Some refinement of the above code
val context = ConcurrentHashMap<String, Any>()
val dispatcher =
if (vote.castingStrategy == PARALLEL)
CommonPool
else
newSingleThreadContext("touchstone.vote")
val voteMap = runBlocking {
try {
voters
.map { async(dispatcher) { it.vote(context) } }
.groupBy { withTimeout(vote.timeoutMillis) { it.await() } }
.mapValues { it.value.size }
} catch (e: TimeoutCancellationException) {
throw PreconditionFailedException(cause = e)
} finally {
if (vote.castingStrategy != PARALLEL)
(dispatcher as? ThreadPoolDispatcher)?.close()
}
}
Note: For some reason, groupingBy { it.await() }?.eachCount() doesn't compile with "Suspension functions can be called only within coroutine body".
This is a good use case for not yet implemented https://github.com/Kotlin/kotlinx.coroutines/issues/172, thanks.
Solution with choosing context depending on flag is fine except that single thread executor wont give any speedup (also, it will be destroyed after first voteMap computation and all consecutive computations of voteMap will fail)
Optionally, I'd like to replace the Map
with something from the CoroutineContext that can be used as a temporary bag of data to pass from one voter to another
Consider using CoroutineContext.Key:
class VotersContext(val context: MutableMap<String, Any>) : AbstractCoroutineContextElement(VotersContextKey)
object VotersContextKey: CoroutineContext.Key<VotersContext>
fun computeVotes() {
val voteContext: MutableMap<String, Any> = ...
val ctx = dispatcher + VotersContext(map)
// Launch jobs
... async(ctx) { it.vote(coroutineContext) }
}
fun vote(context: CoroutineContext) {
val votersContext = context[VotersContextKey]!!.context
...
}
For some reason, groupingBy { it.await() }?.eachCount() doesn't compile with "Suspension functions can be called only within coroutine body".
It's because groupingBy is not marked as inline (it creates lazy sequence and is not inlineable in general) and suspension is possible only from other suspending functions (documentation may be helpful for understanding limitations)
@qwwdfsad Thanks for your response.
it will be destroyed after first voteMap computation
Are you referring to the close method call? I did that based on the docs
NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its thread).
Resources are reclaimed by [ThreadPoolDispatcher.close].
If calling close is not desired, what is the above warning for? Also, it appears from a quick glance at the code that newSingleThreadContext creates a new ThreadPoolDispatcher every time, so I'm not sure why you think " all consecutive computations of voteMap will fail".
As for the votersContext, I think I'm going to stick with a simple map. The recursive usage of CoroutineContext containing Elements that are CoroutineContext themselves seems too complicated for this case.
Yes, if this is complete sample (=> you create and destroy newSingleThreadContext every time and this is intended), then it's okay.
More common pattern is to create newSingleThreadContext or newFixedThreadPoolContext once and close it before application termination
A simple footnote,
using a newSingleThreadContext does imply executing the coroutines in only one thread (like Javascript does), however the async blocks are not mutually exclusive.
@fvasco
the async blocks are not mutually exclusive
What does that mean for my use case, where I want to run the coroutines sequentially?
@qwwdfsad
documentation may be helpful for understanding limitations
Could you be little more specific please? I'm not finding anything in https://kotlinlang.org/docs/reference/coroutines.html to support what you said related to inline.
Hi @asarkar
Do you want to execute code in asynchronous? Use launch/async
Do you want to execute mutually exclusive blocks? Use Mutex
Do you want to execute tasks sequentially? Use actor
Context is not the right abstraction to delegate this behavior.
In the example
https://github.com/Kotlin/kotlinx.coroutines/blob/master/coroutines-guide.md#coroutines-are-like-daemon-threads
The launch blocks are executed concurrently even the DefaultContext's thread pool owns lesser than 1000 threads (it works perfectly even with only one thread).
@fvasco It doesn't look like you took the time to understand my use case described in the original post. I'm not looking for a generic "when to use coroutines" guidelines. I'm following up on your comment that I assumed was made to bring out a problem in what was being discussed. If that's not the case, and we can ignore your comment about async as far as my use case is concerned, please say so.
The code in your opening post apparently works.
Some considerations:
vote is not a suspending functions, so it completes always (regardless withTimeout)async block is in a context different than withTimeout's context, so it completes always (regardless withTimeout)async concurrency level is the DefaultExecutor concurrency level and it never invokes any suspendig function, so the parallelism is limited (the case is not the same in the guide).That code is much or less the same of:
if (vote.castingStrategy == PARALLEL) {
ForkJoinExecutor.submit(...).wait()
} else {
voters.map { it.vote(context) }.groupingBy { it }?.eachCount()
}
if ( timeExpired ) throw CancellationException()
If the above statements are right then I suppose you should review your code, and the proposed refinement is not a great enhancement.
My friendly suggestion is to avoid coroutines for this use case, do it using multithread (or understand better how coroutine works).
Sorry for frankness, please do not read any arrogance in this lines, it isn't absolutely my goal.
Good luck,
Vasco
@asarkar
Could you be little more specific please?
Sure. Paragraph about suspending functions:
Suspending functions can take parameters and return values in the same manner as regular functions, but they can only be called from coroutines and other suspending functions, as well as function literals inlined into those.
groupingBy is not suspending function and is not inlined into caller (which is runBlocking block with type suspend CoroutineScope.() -> T), thus calling suspend function from groupingBy is prohibited. E.g. map method is defined as public inline fun <T, R> Iterable<T>.map and is inlined into block, so any suspend function can be called from map
@qwwdfsad, @fvasco thank you for the explanation.
avoid coroutines for this use case, do it using multithread (or understand better how coroutine works)
I went with "avoid coroutines for this use case" 馃槃