Kotlinx.coroutines: Flow.retry() and cancellation

Created on 20 Apr 2019  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

The implementation of Flow.retry() differentiates between upstream exceptions that should potentially trigger a retry and downstream exceptions that should pass through by wrapping the call to emit. This has unexpected behavior in the case of cancellation when the operation is suspended on a call that is technically upstream.

Consider:

val reader = flow {
  while (hasMoreEvents) {
    emit(suspendingReadEvent())
  }
}

val collectorJob = launch {
  reader.retry().collect { event ->
    handle(event)
  }
}

// Some time later, as part of an external signal
collectorJob.cancelAndJoin()

The above can potentially never join. retry attempts to start a new collect from upstream after catching the CancellationException that did not originate from the inner emit call within retry's implementation, and the new collect on the upstream source quickly fails because the calling job is cancelling. Retry dutifully catches the CancellationException again and repeats the process.

The problem can be avoided by explicitly allowing CancellationException to pass through via the predicate, e.g. reader.retry { it !is CancellationException } though perhaps this should be the default behavior.

flow

Most helpful comment

Yes. We are thinking on how to generalize this solution.

All 5 comments

I agree that this behavior is counter-intuitive and should be fixed.
But it is not that simple to fix, for example:

val flow = flow<Int> {
    withTimeout(someTimeout) {
        val value  = somePotentiallyTimedOutSuspendingCall()
    }
    emit(value)
}.retry { it !is CancellationException }

withTimeout interfere with retry in a strange way here

Would checking the collecting job's isActive be appropriate?

No, because it is prone to check-and-act data races.
Though I think there exists a solution that wraps the emission into coroutineScope, I will try to prototype it

Are there plans to make the underpinnings of this check public API that could be used from operators defined outside of kotlinx.coroutines?

Yes. We are thinking on how to generalize this solution.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Pitel picture Pitel  路  3Comments

elizarov picture elizarov  路  3Comments

ScottPierce picture ScottPierce  路  3Comments

elizarov picture elizarov  路  3Comments

mttmllns picture mttmllns  路  3Comments