IO.fx {
dispatchers().io().raceN(timer().sleep(10.seconds).followedBy(effect { println("Hello") }), effect { println("Hello world") })
.fork().bind().join().bind()
}.suspended()
// => Cancellation exception
This however works fine:
IO.fx {
dispatchers().io().raceN(timer().sleep(10.seconds).followedBy(effect { println("Hello") }), effect { println("Hello world") })
.bind()
}.suspended()
This another example also raises the same exception:
IO.raceN(IO.never, IO.never, IO.never, just(1))
.map {
it.fold(
::identity,
::identity,
::identity,
::identity
)
}
It might have to do with cancellation in general, although not appearing in other tests for some unknown reason. It seems the following code in IORunLoop might be a lead:
if (conn.isCanceled()) {
cb(Left(OnCancel.CancellationException))
return
}
It's a problem with how we currently deal with cancelation in the IORunLoop since cb(Left(OnCancel.CancellationException)) will get thrown in some cases. But when that happens within another IO that IO catches the cancelation exception and fails with that.
Other IOs don't rely on CancelationException, and we don't have to either.
@pakoito mentioned this was added for KotlinX to be able to cancel delay, howeverthat was beforeCoroutineScope` & structured concurrency were introduced in KotlinX so that might not be relevant anymore.
So if we can investigate that as part of the KotlinX cancelation integration efforts than we might be able to fix this easily and remove CancelationException.
Other IOs don't rely on CancelationException, and we don't have to either.
@pakoito mentioned this was added for KotlinX to be able to cancel delay, howeverthat was beforeCoroutineScope` & structured concurrency were introduced in KotlinX so that might not be relevant anymore.
AFAIK the cancellation in coroutines still seems to be necessary. In any case looks like that the cancellation is also used internally to cancel other ops, otherwise an IO.Async with a timeout wouldn't be stopped it seems.
@nomisRev suggested to remove the callback call to pass the exception
The problem is in the raceN implementation, where cancelation errors aren't suppressed correctly because we're effectively launching a new coroutine and don't check for that case. The PR above removes it from everywhere, which is an API change.
Indeed that's the case, I've reimplemented the IO's inner race to be cancellable by default, therefore all raceN calling this one within IO do not fail anymore. But if I remember correctly the Concurrent version of race would still suffer from the same problem, which means that depending on how you declare your IO's fx block you might run into the problem or not (e.g. calling dispatchers().io().raceN). I believe we should also add extra tests to verify the purpose of the exception itself.
In any case we can discuss if it makes more sense trying to split the PR into 2, one trying to address the race problem itself + adding extra tests and the other removing the exception if we consider it necessary.
+1 for the split
@pakoito there you go! #1986 It actually works with the exception, so we can consider removing it later if needed 馃檶
Most helpful comment
It's a problem with how we currently deal with cancelation in the
IORunLoopsincecb(Left(OnCancel.CancellationException))will get thrown in some cases. But when that happens within anotherIOthatIOcatches the cancelation exception and fails with that.Other
IOs don't rely onCancelationException, and we don't have to either.@pakoito mentioned this was added for KotlinX to be able to cancel
delay, howeverthat was beforeCoroutineScope` & structured concurrency were introduced in KotlinX so that might not be relevant anymore.So if we can investigate that as part of the KotlinX cancelation integration efforts than we might be able to fix this easily and remove
CancelationException.