Kotlinx.coroutines: Start collecting multiple flows together without suspension deadlock?

Created on 26 Jul 2019  路  2Comments  路  Source: Kotlin/kotlinx.coroutines

Let's say I have two Flows which I want to run and collect individually:


class SomeOp<T>(val source1: Flow<T>, val source2: Flow<T>) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        val gate = AtomicBoolean()
        source1.collect { gate.set(true) }
        source2.collect { if (!gate.get()) collector.emit(it) }
    }
}

However, if I run this as follows:

@Test
    fun someOp() = runBlocking {
        SomeOp(flow<Int> {
            delay(1000)
            emit(1)
        }, flow<Int> {
            for (i in 1..10) {
                delay(200)
                emit(i)
            }
        })
        .collect { println(it) }
    }

Nothing is printed.

If I follow the suggestion to not use subscribeOn but flowOn, it still doesn't print anything:

    @Test
    fun someOp() = runBlocking {
        SomeOp(flow<Int> {
            delay(1000)
            emit(1)
        }
        .flowOn(Dispatchers.IO)
        , flow<Int> {
            for (i in 1..10) {
                delay(200)
                emit(i)
            }
        })
        .collect { println(it) }
    }

My understanding is that source1.collect doesn't return until source1 is finished, thus source2 can't start collecting.

If I launch the delay part:

    @Test
    fun someOp() = runBlocking {
        SomeOp(flow<Int> {
            launch(Dispatchers.IO) {
                delay(1000)
                emit(1)
            }
        }
        , flow<Int> {
            for (i in 1..10) {
                delay(200)
                emit(i)
            }
        })
        .collect { println(it) }
    }

1..5 is printed along with a hefty IllegalStateException:


java.lang.IllegalStateException: Flow invariant is violated: emission from another coroutine is detected (child of "coroutine#2":StandaloneCoroutine{Active}@59fb13fe, expected child of "coroutine#1":BlockingCoroutine{Active}@49ddae39). FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow'


    at kotlinx.coroutines.flow.internal.SafeCollector$checkContext$result$1.invoke(SafeCollector.kt:79)
    at kotlinx.coroutines.flow.internal.SafeCollector$checkContext$result$1.invoke(SafeCollector.kt:12)
    at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:57)
    at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:57)
    at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:37)
    at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:30)
    at hu.akarnokd.kotlin.flow.impl.FlowPublishFunctionTest$someOp$1$1$1.invokeSuspend(FlowPublishFunctionTest.kt:30)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)

Is there a way to fix this behavior?

(I've digged into the flatMapMerge which probably has to deal with a similar situation, but the implementation goes into deep internals about undispatched things, etc.)

flow question

Most helpful comment

Launch a new coroutine to do your first collect:

class SomeOp<T>(val source1: Flow<T>, val source2: Flow<T>) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val gate = AtomicBoolean()
            launch {
                source1.collect { gate.set(true) }
            }
            // Alternatively,
            // source1.onEach { gate.set(true) }.launchIn(this@coroutineScope)
            source2.collect { if (!gate.get()) collector.emit(it) }
        }
    }
}

The coroutineScope block will wait for that coroutine to finish, so your operator's collectSafely will still wait for both flows. Exceptions thrown from either collect will also propagate up the job hierarchy and be reported to the downstream collector.

All 2 comments

Launch a new coroutine to do your first collect:

class SomeOp<T>(val source1: Flow<T>, val source2: Flow<T>) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val gate = AtomicBoolean()
            launch {
                source1.collect { gate.set(true) }
            }
            // Alternatively,
            // source1.onEach { gate.set(true) }.launchIn(this@coroutineScope)
            source2.collect { if (!gate.get()) collector.emit(it) }
        }
    }
}

The coroutineScope block will wait for that coroutine to finish, so your operator's collectSafely will still wait for both flows. Exceptions thrown from either collect will also propagate up the job hierarchy and be reported to the downstream collector.

Let me add that it would be even better if this code _does not_ extend AbstractFlow, but uses a flow { ... } builder instead. Two advantages: flow { ... } builder is supported, and you save one level of indentation.

Was this page helpful?
0 / 5 - 0 ratings