Hi, in my app I randomly get a ChildCancellationException. It comes from the flatMapLatest operator (previously named switchMap). I tested it on the JVM with versions 1.3.0-M2, 1.3.0-RC, 1.3.0-RC2. I was able to reproduce it in a test:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
class MyTest {
@Test
fun test(): Unit = runBlocking {
val context = Dispatchers.Default
val deferred = GlobalScope.async(context) {
launch(context) {
observeFlow().collect {}
}
while (true) {
channel.send(channel.value)
}
}
deferred.await()
}
val channel = ConflatedBroadcastChannel("")
val flow = channel.asFlow()
fun observeFlow() = flow.flatMapLatest {
val flows = List(2) {
flow {
while (true) {
emit("")
}
}
}
combine(flows) { it.asList() }
}
}
Exception in thread "DefaultDispatcher-worker-5 @coroutine#716" kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
(Coroutine boundary)
at kotlinx.coroutines.channels.AbstractChannel.registerSelectReceiveOrNull(AbstractChannel.kt:753)
at kotlinx.coroutines.channels.AbstractChannel.access$registerSelectReceiveOrNull(AbstractChannel.kt:484)
at kotlinx.coroutines.channels.AbstractChannel$onReceiveOrNull$1.registerSelectClause1(AbstractChannel.kt:732)
at kotlinx.coroutines.selects.SelectBuilderImpl.invoke(Select.kt:415)
at kotlinx.coroutines.flow.internal.CombineKt$combineInternal$2.invokeSuspend(Combine.kt:151)
Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
at kotlinx.coroutines.flow.internal.ChannelFlowTransformLatest$flowCollect$3$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:56)
at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAll$1.invokeSuspend(Channels.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
Of course this test is not my real code, but this appears to reproduce the exception (almost) every time it is run. You may need to run the test again if you do not get the exception. It should happen within a few seconds of running the test.
Is this expected behaviour and/or is there anything I can do to fix this?
There was a discussion about this on Slack with @qwwdfsad: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1563388954310700
Apparently, it is a bug in the underlying implementation mechanism of combine
Will be fixed in 1.3.0, thanks for the repro
@qwwdfsad I see a random
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed when running something like
return channel.consumeAsFlow()
.first { capture ->
if (this(capture)) {
true
} else {
capture.close()
false
}
}
The following code (using the channel directly) gives me another stack trace:
return channel
.first { capture ->
if (this(capture)) {
true
} else {
capture.close()
false
}
}
java.util.concurrent.CancellationException: RendezvousChannel was cancelled
at kotlinx.coroutines.channels.AbstractChannel.cancel(AbstractChannel.kt:624)
at kotlinx.coroutines.channels.ChannelsKt__Channels_commonKt.cancelConsumed(Channels.common.kt:117)
at kotlinx.coroutines.channels.ChannelsKt.cancelConsumed(Unknown Source:1)
at com.roche.greendot.camera.ImageSelectorKt.invoke(ImageSelector.kt:136)
at com.roche.greendot.camera.ImageSelectorKt$invoke$4.invokeSuspend(Unknown Source:14)
Is this related, or should I create a new ticket (and even try to create a sample for it)?
@sellmair It's better to open a new issue with a reproducer I can run
I still see this on 1.3.2. I'm not sure if I'm doing something wrong, but the use case is very similar to the one described.
The issue happens when the callback inside a channelFlow, inside a flatMapLatest, tries to offer the channel a value after the channelFlow is already cancelled.
Code:
private val geoQueryData: Flow<HashMap<Key, GeoLocation>> = geoQueryChannel.asFlow().flatMapLatest {
channelFlow {
val searchRadiusChannel = searchRadiusInKms.openSubscription()
val searchRadiusObserver = launch {
for (searchRadius in searchRadiusChannel) {
delay(1000)
it.radius = searchRadius
}
}
val map = hashMapOf<Key, GeoLocation>()
val listener = object : GeoQueryEventListener {
override fun onKeyEntered(key: String, location: GeoLocation) {
map[key] = location
}
override fun onKeyExited(key: String) {
map.remove(key)
}
override fun onKeyMoved(key: String, location: GeoLocation) {
map[key] = location
}
override fun onGeoQueryReady() {
offer(map)
}
override fun onGeoQueryError(error: DatabaseError) {
cancel(CancellationException("API Error", error.toException()))
}
}
it.addGeoQueryEventListener(listener)
awaitClose {
searchRadiusObserver.cancel()
searchRadiusChannel.cancel()
it.removeGeoQueryEventListener(listener)
}
}
}.conflate().flowOn(Dispatchers.IO)
fun DatabaseReference.asChannelFlow(geoLocation: GeoLocation) = channelFlow {
val listener = [email protected](
object : ValueEventListener {
override fun onDataChange(userSnap: DataSnapshot) {
logd("Got snap $userSnap")
userSnap.getValue(User::class.java)?.apply {
pos = geoLocation.asLatLng()
}?.let {
logd("Got user, offering $it")
offer(it)
}
}
override fun onCancelled(p0: DatabaseError) {
cancel("OnCancelled $p0", p0.toException())
}
}
)
logd("Got - asChannelFlow")
awaitClose {
logd("Got - asChannelFlow closing")
[email protected](listener)
}
}
val nearbyUsers: Flow<Resource<List<User>>> = geoQueryData.flatMapLatest {
logd("Got from GeoQuery $it")
combine(
it.map { (key, geoLocation) -> db.userRef(key).asChannelFlow(geoLocation) }
) { usersArray ->
logd("Got usersArray $usersArray")
Resource.Success(usersArray.asList().sortedByDistance(centerLocation))
}
}.conflate().flowOn(Dispatchers.IO)
Stacktrace shows the line that caused the crash was:
offer(it)
Inside the Firebase listener, inside the channelFlow.
2019-11-16 19:53:05.890 18263-18263/com.faztudo E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.faztudo, PID: 18263
kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
(Coroutine boundary)
at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:166)
at kotlinx.coroutines.channels.ChannelCoroutine.offer(Unknown Source:2)
at com.faztudo.common.data.NearbyUsersFlow$asChannelFlow$1$listener$1.onDataChange(NearbyUsersFlow.kt:96)
at com.google.firebase.database.core.ValueEventRegistration.fireEvent(com.google.firebase:firebase-database@@19.2.0:75)
at com.google.firebase.database.core.view.DataEvent.fire(com.google.firebase:firebase-database@@19.2.0:63)
at com.google.firebase.database.core.view.EventRaiser$1.run(com.google.firebase:firebase-database@@19.2.0:55)
at android.os.Handler.handleCallback(Handler.java:883)
at android.os.Handler.dispatchMessage(Handler.java:100)
at android.os.Looper.loop(Looper.java:214)
at android.app.ActivityThread.main(ActivityThread.java:7356)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:492)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:930)
Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
at kotlinx.coroutines.flow.internal.ChannelFlowTransformLatest$flowCollect$3$invokeSuspend$$inlined$collect$1.emit(Collect.kt:137)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAll(Channels.kt:56)
at kotlinx.coroutines.flow.FlowKt.emitAll(Unknown Source:1)
at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAll$1.invokeSuspend(Unknown Source:10)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(Dispatched.kt:241)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:594)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:740)
Reading #1454, It seems perhaps it is as expected. Using send inside a launch coroutine on the callback seems to fix it. This is quite unexpected to me, though, and it should be mentioned clearly on the channelFlow documentation, which uses an example with offer.
offer is for non-suspending context, while send is for suspending ones.
offer is, unfortunately, non-symmetric to send in terms of propagated exceptions (CancellationException from send is usually ignored, while CancellationException from offer in nom-suspending context is not).
We hope to fix it in #974 either with offerOrClosed or changing offer semantics
Most helpful comment
Apparently, it is a bug in the underlying implementation mechanism of
combineWill be fixed in 1.3.0, thanks for the repro