Kotlinx.coroutines: Improve API for cooperative cancellation in Flow

Created on 14 May 2020  路  6Comments  路  Source: Kotlin/kotlinx.coroutines

Problem statement

Currently, Flow cancellation is hard to get right. For example, the following code:

(0..99).asFlow().onEach { if (it > 10) coroutineContext.cancel() }.collect { println(it) }

will print 100 lines with no easy way to prevent this behaviour in a more generic case.

One may decide to rewrite the source in the following manner:

flow {
    while (isActive) {
        // emit one more element
    }
}

but this is incorrect because isActive can only be captured from an enclosing CoroutineScope (whether it is other runBlocking or MyActivity : CoroutineScope).

Making each emit call cancellation friendly is impractical for performance reasons, e.g. when no cancellation and/or thread-safety is involved.

Proposal

  • Make flow {} builder emit methods cancellable. This builder already performs various checks to preserve both exception and context transparency, so adding one more is negligible
  • Introduce new cancellable() operator for the flows that are not cancellable by default
  • Introduce currentCoroutineContext() method that retrieves kotlin.coroutines.coroutinesContext and does not clash with CoroutineScope member
  • Introduce the following set of deprecated declarations that work like a lint check:
val FlowCollector<*>.coroutineContext // Replacement: currentCoroutineContext()
fun FlowCollector<*>.cancel(cause: CancellationException? = null)  // Replacement: currentCoroutineContext().cancel()
val FlowCollector<*>.isActive  // Replacement: currentCoroutineContext().isActive
flow

Most helpful comment

The scope of this issue is clear, but isn't not clear the difference between currentCoroutineContext() and coroutineContext outside this issue.

What "current" stand for?
Why coroutineContext should not be the "current"?

All 6 comments

The scope of this issue is clear, but isn't not clear the difference between currentCoroutineContext() and coroutineContext outside this issue.

What "current" stand for?
Why coroutineContext should not be the "current"?

@fvasco Consider the following example:

launch { // this: CoroutineScope
    val s1 = coroutineScope // resolves to this.coroutineScope
    val s2 = kotin.coroutines.coroutineScope // top-level
    check(s1 === s2) // always true. That's how "launch" and other builders are designed
}

But flow is special (why -- for performance reasons and because flow is cold). Consider this code:

launch { // this: CoroutineScope
    val flow = flow { // this: FlowCollector 
        val s1 = coroutineScope // resolves to [email protected]
        val s2 = kotin.coroutines.coroutineScope // top-level
        check(s1 === s2) // May fail if flow is collected in a different context
    }
}

currentCoroutineScope() function is introduced to provide a shortcut for kotlin.coroutines.coroutineScope that is nicer to write.

Hi @elizarov, thank you for response.

I suggest you to reread your reply, I consider it a little tricky, it is not really easy to understand for a newbie.
Can you consider flowCoroutineScope, flowCollectorScope or something else?

@fvasco It is tricky, indeed, because you can accidentally capture the (wrong) outer context. There's not much we can do, but to warn on potentially invalid code. That's what we do, too.

As for naming, it is not clear how flowCoroutineScope is better than currentCoroutineContext. The problem with the latter name is that the flow is cold, flow itself does not have any context. But when there is a current collector, then it works in some scope and has context. Hence currentCoroutinContext. We could have also called it something like collectorCoroutineContext, but the problem with this name is that we cannot restrict this function to be used only in flow -- it is a generic shortcut to kotlin.coroutiens.coroutineCotext that you can use anywhere and in most cases there is no difference between a plain unqualified coroutineContext and currentCoroutineContext(). It is only in the flow (because it is cold), the difference could be observed.

Thank you, @elizarov,
what do you mean with kotlin.coroutines.coroutineScope?
kotlin.coroutines.coroutineContext?

Maybe the best solution is to provide kotlin.coroutines.coroutineContext as property coroutineContext of FlowCollector, so to add a new field in the interface, but is too late to modify the interface and too early to define a inline suspend val :-/

what do you mean with kotlin.coroutines.coroutineScope kotlin.coroutines.coroutineContext?

Thanks, indeed. Corrected.

Was this page helpful?
0 / 5 - 0 ratings