Currently, Flow cancellation is hard to get right. For example, the following code:
(0..99).asFlow().onEach { if (it > 10) coroutineContext.cancel() }.collect { println(it) }
will print 100 lines with no easy way to prevent this behaviour in a more generic case.
One may decide to rewrite the source in the following manner:
flow {
while (isActive) {
// emit one more element
}
}
but this is incorrect because isActive can only be captured from an enclosing CoroutineScope (whether it is other runBlocking or MyActivity : CoroutineScope).
Making each emit call cancellation friendly is impractical for performance reasons, e.g. when no cancellation and/or thread-safety is involved.
flow {} builder emit methods cancellable. This builder already performs various checks to preserve both exception and context transparency, so adding one more is negligiblecancellable() operator for the flows that are not cancellable by defaultcurrentCoroutineContext() method that retrieves kotlin.coroutines.coroutinesContext and does not clash with CoroutineScope memberval FlowCollector<*>.coroutineContext // Replacement: currentCoroutineContext()
fun FlowCollector<*>.cancel(cause: CancellationException? = null) // Replacement: currentCoroutineContext().cancel()
val FlowCollector<*>.isActive // Replacement: currentCoroutineContext().isActive
The scope of this issue is clear, but isn't not clear the difference between currentCoroutineContext() and coroutineContext outside this issue.
What "current" stand for?
Why coroutineContext should not be the "current"?
@fvasco Consider the following example:
launch { // this: CoroutineScope
val s1 = coroutineScope // resolves to this.coroutineScope
val s2 = kotin.coroutines.coroutineScope // top-level
check(s1 === s2) // always true. That's how "launch" and other builders are designed
}
But flow is special (why -- for performance reasons and because flow is cold). Consider this code:
launch { // this: CoroutineScope
val flow = flow { // this: FlowCollector
val s1 = coroutineScope // resolves to [email protected]
val s2 = kotin.coroutines.coroutineScope // top-level
check(s1 === s2) // May fail if flow is collected in a different context
}
}
currentCoroutineScope() function is introduced to provide a shortcut for kotlin.coroutines.coroutineScope that is nicer to write.
Hi @elizarov, thank you for response.
I suggest you to reread your reply, I consider it a little tricky, it is not really easy to understand for a newbie.
Can you consider flowCoroutineScope, flowCollectorScope or something else?
@fvasco It is tricky, indeed, because you can accidentally capture the (wrong) outer context. There's not much we can do, but to warn on potentially invalid code. That's what we do, too.
As for naming, it is not clear how flowCoroutineScope is better than currentCoroutineContext. The problem with the latter name is that the flow is cold, flow itself does not have any context. But when there is a current collector, then it works in some scope and has context. Hence currentCoroutinContext. We could have also called it something like collectorCoroutineContext, but the problem with this name is that we cannot restrict this function to be used only in flow -- it is a generic shortcut to kotlin.coroutiens.coroutineCotext that you can use anywhere and in most cases there is no difference between a plain unqualified coroutineContext and currentCoroutineContext(). It is only in the flow (because it is cold), the difference could be observed.
Thank you, @elizarov,
what do you mean with kotlin.coroutines.coroutineScope?
kotlin.coroutines.coroutineContext?
Maybe the best solution is to provide kotlin.coroutines.coroutineContext as property coroutineContext of FlowCollector, so to add a new field in the interface, but is too late to modify the interface and too early to define a inline suspend val :-/
what do you mean with
kotlin.coroutines.coroutineScopekotlin.coroutines.coroutineContext?
Thanks, indeed. Corrected.
Most helpful comment
The scope of this issue is clear, but isn't not clear the difference between
currentCoroutineContext()andcoroutineContextoutside this issue.What "current" stand for?
Why
coroutineContextshould not be the "current"?