Let's say I have two Flows which I want to run and collect individually:
class SomeOp<T>(val source1: Flow<T>, val source2: Flow<T>) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
val gate = AtomicBoolean()
source1.collect { gate.set(true) }
source2.collect { if (!gate.get()) collector.emit(it) }
}
}
However, if I run this as follows:
@Test
fun someOp() = runBlocking {
SomeOp(flow<Int> {
delay(1000)
emit(1)
}, flow<Int> {
for (i in 1..10) {
delay(200)
emit(i)
}
})
.collect { println(it) }
}
Nothing is printed.
If I follow the suggestion to not use subscribeOn but flowOn, it still doesn't print anything:
@Test
fun someOp() = runBlocking {
SomeOp(flow<Int> {
delay(1000)
emit(1)
}
.flowOn(Dispatchers.IO)
, flow<Int> {
for (i in 1..10) {
delay(200)
emit(i)
}
})
.collect { println(it) }
}
My understanding is that source1.collect doesn't return until source1 is finished, thus source2 can't start collecting.
If I launch the delay part:
@Test
fun someOp() = runBlocking {
SomeOp(flow<Int> {
launch(Dispatchers.IO) {
delay(1000)
emit(1)
}
}
, flow<Int> {
for (i in 1..10) {
delay(200)
emit(i)
}
})
.collect { println(it) }
}
1..5 is printed along with a hefty IllegalStateException:
java.lang.IllegalStateException: Flow invariant is violated: emission from another coroutine is detected (child of "coroutine#2":StandaloneCoroutine{Active}@59fb13fe, expected child of "coroutine#1":BlockingCoroutine{Active}@49ddae39). FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
at kotlinx.coroutines.flow.internal.SafeCollector$checkContext$result$1.invoke(SafeCollector.kt:79)
at kotlinx.coroutines.flow.internal.SafeCollector$checkContext$result$1.invoke(SafeCollector.kt:12)
at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:57)
at kotlin.coroutines.CombinedContext.fold(CoroutineContextImpl.kt:57)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:37)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:30)
at hu.akarnokd.kotlin.flow.impl.FlowPublishFunctionTest$someOp$1$1$1.invokeSuspend(FlowPublishFunctionTest.kt:30)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
Is there a way to fix this behavior?
(I've digged into the flatMapMerge which probably has to deal with a similar situation, but the implementation goes into deep internals about undispatched things, etc.)
Launch a new coroutine to do your first collect:
class SomeOp<T>(val source1: Flow<T>, val source2: Flow<T>) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val gate = AtomicBoolean()
launch {
source1.collect { gate.set(true) }
}
// Alternatively,
// source1.onEach { gate.set(true) }.launchIn(this@coroutineScope)
source2.collect { if (!gate.get()) collector.emit(it) }
}
}
}
The coroutineScope block will wait for that coroutine to finish, so your operator's collectSafely will still wait for both flows. Exceptions thrown from either collect will also propagate up the job hierarchy and be reported to the downstream collector.
Let me add that it would be even better if this code _does not_ extend AbstractFlow, but uses a flow { ... } builder instead. Two advantages: flow { ... } builder is supported, and you save one level of indentation.
Most helpful comment
Launch a new coroutine to do your first collect:
The
coroutineScopeblock will wait for that coroutine to finish, so your operator'scollectSafelywill still wait for both flows. Exceptions thrown from either collect will also propagate up the job hierarchy and be reported to the downstream collector.