Kotlinx.coroutines: Flow.collects receives items after it was cancelled on a single thread

Created on 10 Jun 2019  路  13Comments  路  Source: Kotlin/kotlinx.coroutines

Could anyone explain to me why this code throws an exception (on Android)? The collect and cancel functions are both called on the main thread. I tried this with both version 1.2.1 and 1.3.0-M1

var cancelled = false
val job = GlobalScope.launch(Dispatchers.Main) {
    val flow = flow {
        while (true) {
            emit(Unit)
        }
    }
    flow.flowOn(Dispatchers.IO)
        .collect {
            if (cancelled) { // main thread
                throw IllegalStateException() // this exception is thrown
            }
        }
}

GlobalScope.launch(Dispatchers.Main) {
    delay(1000)
    job.cancel() // main thread
    cancelled = true // set to true after the Flow was cancelled.
}

Here is an Android sample project, just run the app and it will crash.

I'm not sure if this is intended behaviour or a bug. I need to make sure all Flows are cancelled in the Android onDestroy so they do not try to update the view if it is already gone. So if this is not a bug I am not sure how I would need to handle this otherwise.

There is a long discussion about this with more code examples on Slack: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1559908916083600

We had some trouble trying to reproduce this in a unit test (it did not throw the exception), so that's why I attached an Android project.

design flow

Most helpful comment

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}

It would be great if this could be implemented as a default as it's really unexpected behavior and its error prone to always have to remember to not use specific library functions.
On Android my collect functions are always right before they touch the views so they must not get invoked after the view is gone.

All 13 comments

Coroutines cancellation is _cooperative_, but your collect implementation does not cooperate with coroutine cancellation mechanics. It checks its own cancelled flag. I'm not sure why you assume that it is somehow supposed to work. You are advised to review this section of the guide on the approaches you can use to make your code cooperate with cancellation:
https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html

Hi @elizarov,

I believe the purpose of the example was to point out that collect calls its lambda without checking for cancellation first. If the example ignores cancelled and checks !isActive instead, the result is the same.

The documentation you linked states:

All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

I think it's easy for developers to gather from this that collect will check for cancellation before calling its provided lambda. This also seems like an intuitive assumption for developers to make.

Here's a use case I've dealt with where this seems especially problematic:

  • An Activity uses C libraries to share code with other platforms
  • Those C libraries are manipulated based on events from objects that live on after the Activity
  • The Activity cleans up its native resources and subscriptions in onDestroy

Note: no thread switching is involved, only different lifetimes between source and observer

Currently, this code base uses RxJava, but if it were naively switched to use Flow and collect, it could result in a corrupted heap due to collect calling a lambda while no longer active that manipulates freed native resources.

While creating a custom collectWhileActive terminal operator is trivial, it seems quite easy for other developers in a similar situation to not know they need such an operator.

But if you have an analogue of while (true) { emit(Unit) } in Rx you'll have the same problem. The real code would be cooperating with cancellation in some way, because it will be invoking some function that checks for cancellation.

I've found that Rx does not have the same problem because Rx does not have atomic cancellation (figured out that's the issue here).

Rx's ObservableObserveOn checks if it is disposed on the observe-side thread. Kotlin's ChannelFlow does no such check so it inherits Channel's atomic cancellation behavior.

This reframes the issue in my mind as: Should Flow operators like flowOn use atomic cancellation?

I'm leaning towards no, myself. A single Channel can guarantee delivery OR cancellation, but a flow may be built from multiple channels (multiple flowOns with a suspending map between, for example) which means that the benefit of atomic cancellation is something that is easily lost end-to-end.

I'm also thinking it'll help more developers avoid race-condition bugs than if the behavior is left as-is.

And if a developer really does need atomic cancellation, they can still use a Channel.

Note: I've since noticed Issue 1177 which is similar. That specific repro was fixed by changing withContext but the race condition still exists for flowOn and collect.

Here's an example that has failed consistently for me on Android in an Activity's onCreate:

    var timeToCancel = false
    GlobalScope.launch(Dispatchers.Main) {
        flow {
            while (isActive) {
                delay(100)
                if (timeToCancel) {
                    runOnUiThread { //Pretend unfortunate timing of onDestroy
                        cancel()
                    }
                }
                emit(Unit)
            }
        }.flowOn(Dispatchers.IO).collect {
            if (!isActive) {
                throw IllegalStateException("How'd I get here?!")
            }
        }
    }
    GlobalScope.launch(Dispatchers.Main) {
        delay(1000)
        timeToCancel = true
    }

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}

It would be great if this could be implemented as a default as it's really unexpected behavior and its error prone to always have to remember to not use specific library functions.
On Android my collect functions are always right before they touch the views so they must not get invoked after the view is gone.

What's really strange with this example is that emit should suspend and throw when cancelled but for some reason it doesn't. Adding a yield() inside the loop makes no difference either.

So what baffles me is why the coroutine that is collecting isn't throwing a cancellation exception. When the job is cancelled that coroutine is suspended and should be cancelled atomically?

As I understand it using flowOn is creating a channel under then hood so it is essentially the same as producing items into a buffered channel and consuming it on the other side? It feels like consuming a channel on the main thread should not be able to lead to this condition if cancellation is also done on the main thread.

Here is another example that is really strange that behaves very differently than expected (using channels directly):

runBlocking {
    val c = Channel<Int>(Channel.BUFFERED)
    val job = launch {
        for (i in c) {
            println("isActive = $isActive")
            println("Got $i")
        }
    }

    println("Sending 1")
    c.send(1)
    launch {
        println("Sending 2")
        c.send(2)
        println("Cancelling")
        job.cancel()
        println("Sending 3")
        c.send(3)
    }
}

I would expect the snippet above to print:

Sending 1
isActive = true
Got 1
Sending 2
Cancelling
Sending 3

But it actually prints:

Sending 1
isActive = true
Got 1
Sending 2
Cancelling
Sending 3
isActive = false
Got 2
isActive = false
Got 3

@ansman There is nothing strange with your Channel example. It behaves so by design, because receiving from a channel is an atomic operation. Details of that are documented here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html

This is pretty unexpected to me but explains the issue with flows. This does feel like a very dangerous behavior on Android where objects tend to be nulled out in response to lifecycle methods. This would definitely not happen with RX since like @nickallendev said RX checks for cancellation both in the producer and consumer sides

It does not have to be this way for flow. I don't believe flows should support atomic value transfer across different threads/dispatchers. I'll submit PR with a fix for flows.

@elizarov am I mistaken about the fix merged in #1937? I am using 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' and I am expecting that when I have a flow which is canceled then that flow should be signaling the cancellation to any _ongoing_ cancellable work in its downstream associated collector. But what I see is that the cancellable work that is kicked off inside my collector continues to run even after the flow that kicked it off has been cancelled. This is on Android where I have a simple activity with one button:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        val currentClick = MutableStateFlow(0)

        lifecycleScope.launch {
            currentClick.flatMapLatest { currentClickCount ->
                flow {
                    var endlessCounter = 1
                    while (true) {
                        emit(Pair(currentClickCount, endlessCounter++))
                        delay(6000)
                    }
                }
            }.collect { (currentClickCount, endlessCounter) ->
                doSomething(currentClickCount, endlessCounter)
            }
        }

        findViewById<Button>(R.id.button).setOnClickListener {
            Log.i(TAG, "Button clicked: should cancel any ongoing work in doSomething and start new stream")
            lifecycleScope.launch { currentClick.emit(currentClick.value + 1) }
        }
    }

    private suspend fun doSomething(currentClickCount: Int, endlessCounter: Int) {
        Log.i(TAG, "doSomething 1: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
        delay(2000)
        Log.i(TAG, "doSomething 2: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
        delay(2000)
        Log.i(TAG, "doSomething 3: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
    }
}

I was expecting that if I click this button and emit a new value on my currentClick stateflow then when flatMapLatest cancels the previous internal endless flow and kicks of another one, then the collector for the canceled flow should be signaled and this cancellation should propagate down to my doSomething suspend method in which I have delay() calls that cooperate with cancellation. Instead what I see if I click my button to emit a new value on my currentClick stateflow which triggers the flatMapLatest is the previous collector invocation of doSomething continues until all the delay() calls have completed, even if I cancel the source flow right after the first delay() inside doSomething.

Here is the logs I see in this scenario:

2021-02-10 15:50:05.587 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:07.593 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:09.599 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:11.588 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:13.597 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:15.604 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:17.595 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:18.571 24901-24901/com.example.launchdarrker I/GeoMain1: Button clicked: should cancel any ongoing work in doSomething and start new stream
2021-02-10 15:50:19.598 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:21.607 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:21.607 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 1, endlessCounter: 1 
2021-02-10 15:50:23.610 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 1, endlessCounter: 1 
2021-02-10 15:50:25.614 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 1, endlessCounter: 1 

@gpartida

#1937 stops the lambda passed to collect from being invoked if lifecycleScope has already been cancelled.

Flow implementations are required to emit items with the same CoroutineContext as was used to call collect(called context preservation). This means the behavior you were expecting is actually impossible with any conforming implementation. Cancelling the work downstream is only possible by cancelling the entire collection.

To get your desired behavior, you can call collectLatest and perform all the work you want to be cancelled when a new event arrives within the provided lambda.

Note: instead of .flatMapLatest { flow { ... } } consider .transformLatest { ... }

@nickallendev thanks! collectLatest is exactly what I was missing. As for flatMapLatest vs transformLatest I had just come up with a toy example of my actual code to explain my question. In my actual code I take the emission of one flow and for each one I call another method which itself returns a Flow. I'm wondering if there is still any reason to prefer transformLatest with maybe emitAll(flowReturningFunc()) over flatMapLatest in this scenario?

Was this page helpful?
0 / 5 - 0 ratings