Why does the following code calls the uncaught exception handler?
As there is an onError supplied I expect it to be called instead.
import io.reactivex.Observable
import io.reactivex.Single
import kotlinx.coroutines.experimental.rx2.await
import kotlinx.coroutines.experimental.rx2.rxSingle
import java.io.IOException
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
Observable
.interval(1, TimeUnit.MILLISECONDS)
.switchMapSingle {
rxSingle {
timeBomb().await()
}
}
.subscribe({}, {})
Thread.sleep(1000)
}
private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
.doOnSuccess { throw IOException("") }
I may be wrong, but I think it's more an Rx issue. I have the same problem without coroutines
import io.reactivex.Observable
import io.reactivex.Single
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
Observable
.interval(1, TimeUnit.MILLISECONDS)
.switchMapSingle {
timeBomb()
}
.blockingSubscribe(
{ println("element = $it") },
{ println("error: ${it.message}") }
)
}
private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
.doOnSuccess { throw Exception("something went wrong") }
Nope, that one calls the RxJavaPlugins.setErrorHandler error handler with an UndeliverableException. My example calls the threads UncaughtExceptionHandler
Yes indeed sorry.
However, Rx shouldn't throw an UndeliverableException if the onError is implemented (which is the case i my example).
So what is your expected behavior ?
Do you want the exception to be delivered to Rx error handler instead of the UncaughtExceptionHandler ?
Or do you want only the onError to be called and have no uncaught exception ? (In this case even the plain rx code doesn't work)
I expect exceptions that can't be delivered inside the RxJava chain to be delivered to the RxJavaPlugins.
@akarnokd What do you think is the correct behavior here?
You have switchMapSingle that cancels the current outstanding Single when switching. At this point the await crashes and since that Single is no longer the current, it counts as an undeliverable exception.
Many blocking (IO) APIs respond to async cancellations with exceptions so that their blocking awaiters can be unblocked and resumed with a failure. I assume coroutines do the same minus the blocking part.
I'm not sure if there anything that can be done in coroutines, however I find it related to the issue #251 Maybe we can find some common solution to both.
Well in RxJava, exceptions that cannot be delivered, get wrapped in an UndeliverableException and forwarded to the error handler by RxJavaPlugins#onError
This is nice because I can set an error handler and don't crash my app.
I expect the coroutine to catch that excpetion if it's not deliverable and forward it to the plugins too.
Else I would need a generic try-catch around every place where I let coroutines and rx interop.
The current approach in kotlinx.coroutines is that all undeliverable (uncaught) exception are forwarded to CoroutineExceptionHandler that you can explicitly install in your context to prevent crashes. However, the desire is to have a design that does not result in "racy" uncaught exceptions (that happen only when the certain events happen at the exactly right time).
Ah that's nice. So what about installing this by default in all rxXYZ methods?
object RxCoroutineExceptionHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
CoroutineExceptionHandler {
override fun handleException(context: CoroutineContext, exception: Throwable) {
RxJavaPlugins.onError(exception)
}
}
So it can be used like
fun <T> rxSingle(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
): Single<T> = Single.create { subscriber ->
val newContext = newCoroutineContext(context + RxCoroutineExceptionHandler, parent)
val coroutine = RxSingleCoroutine(newContext, subscriber)
subscriber.setCancellable(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
I don't know if that is the right approach. I'm wary of statics. This way, the behavior of some library code that uses rxSingle will non-trivially depend on some static initialization that was done elsewhere. I'd concentrate on fixing the core issue of racy exceptions.
Any news on this? This makes coroutines-rx integration quite painful for me.
I always have try-catch blocks because else my app crashes unpredicatbly.
I've revisited topic starter code and I still don't see what can be done about it in kotlinx.coroutines. The timeBomb function has a bug in it. It crashes inside doOnSuccess handler which, in Rx world, results in io.reactivex.exceptions.UndeliverableException: java.io.IOException. In Rx world all such exceptions are routed to Rx error handler. So if add the following code the beginning of main:
RxJavaPlugins.setErrorHandler { e ->
println("ErrorHandler received exception $e")
}
then I see that it is being invoked from time to time.
How is that related to kotlinx.coroutines and what can we possibly do about it?
If that was the case I'd be happy.
For me it doesn't:
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.plugins.RxJavaPlugins
import kotlinx.coroutines.experimental.rx2.await
import kotlinx.coroutines.experimental.rx2.rxSingle
import java.io.IOException
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
RxJavaPlugins.setErrorHandler {
println("error handler called")
}
Observable
.interval(1, TimeUnit.MILLISECONDS)
.switchMapSingle {
rxSingle {
timeBomb().await()
}
}
.subscribe({}, {})
Thread.sleep(1000)
}
private fun timeBomb() = Single.timer(1, TimeUnit.MILLISECONDS)
.doOnSuccess { throw IOException("") }
It crashes the current thread:
Exception in thread "RxComputationThreadPool-6" java.io.IOException:
at CRKt$timeBomb$1.accept(CR.kt:26)
at CRKt$timeBomb$1.accept(CR.kt)
at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RxComputationThreadPool-3" java.io.IOException:
at CRKt$timeBomb$1.accept(CR.kt:26)
at CRKt$timeBomb$1.accept(CR.kt)
at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RxComputationThreadPool-3" java.io.IOException:
at CRKt$timeBomb$1.accept(CR.kt:26)
at CRKt$timeBomb$1.accept(CR.kt)
at io.reactivex.internal.operators.single.SingleDoOnSuccess$DoOnSuccess.onSuccess(SingleDoOnSuccess.java:53)
at io.reactivex.internal.operators.single.SingleTimer$TimerDisposable.run(SingleTimer.java:56)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Actually it was not happening all the time: This code is reproducible for me:
fun main(args: Array<String>) {
RxJavaPlugins.setErrorHandler {
println("error handler called")
}
val timeBomb = Single.timer(1, TimeUnit.NANOSECONDS)
.doOnSuccess { throw IOException("") }
Observable
.interval(1, TimeUnit.MILLISECONDS)
.switchMapSingle {
rxSingle {
timeBomb.await()
}.onErrorReturnItem(1)
}
.test()
.await()
}
Thanks a lot for this reproducer. It indeed clearly shows a problem. The solution is not obvious though. What I can recommend so far is to install CoroutineExceptionHandler that delegates to your RxJavaPlugins error handler:
val coroutineHandler = CoroutineExceptionHandler { _, t ->
RxJavaPlugings.getErrorHandler().accept(t)
}
....
rxSingle(coroutineHandler) { ... }
That will ensure that your "async exceptions" are handled in the same way regardless of whether they were caught by Rx or by coroutines machinery.
For v2, it is RxJavaPlugins.onError(exception);.
Thank's, I do that for some time now.
It is crutial to catch CancellationExceptions, else they end up in your RxJava error handler.
import io.reactivex.plugins.RxJavaPlugins
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineExceptionHandler
import kotlin.coroutines.experimental.AbstractCoroutineContextElement
import kotlin.coroutines.experimental.CoroutineContext
object RxCoroutineExceptionHandler : AbstractCoroutineContextElement(CoroutineExceptionHandler),
CoroutineExceptionHandler {
override fun handleException(context: CoroutineContext, exception: Throwable) {
if (exception is CancellationException) return
RxJavaPlugins.onError(exception)
}
}
My initial suggestion was to install this by default in the rxXYZ functions. What about that?
We could use a default coroutine exception handler for rxXXX function that delegates to Rx error handler. I wonder if that could break anything else.
In general, it's not an rx-specific issue and the same problem arises in any cancellable asynchronous API. And there is no good solution: one is to ignore unhandled exceptions (which can lead to a lot of hidden/undebuggable bugs), the other is to always handle them and crash related coroutines (which can lead to a lot of pain with try-catch machinery).
Thus unconditionally installing a default rx handler can lead to an undesired behaviour. Moreover, it's easy to forget rx-handler during the launch of not rx-based nested coroutines.
We may do the following:
1) Introduce RxCoroutineExceptionHandler
2) After #410 introduce RxScope with pre-installed RxCoroutineExceptionHandler.
Then you can choose between (standard) GlobalScope.rxSingle { ... } and RxScope.rxSingle { ... }
Hey everyone, any updates about this issue? We had the same problem described in this issue, we have fixed it using the RxCoroutineExceptionHandler in all the rxSingle/rxCompletable invocation.
Is this the best way to solve it? Should RxCoroutineExceptionHandler be included in the library as default exception handler for all the methods that transforms a suspending method into an rx object?
@fabioCollini yes, I've made the fix for 1.3.3.
The current solution is to report undeliverable exceptions to RxJavaPlugins, so @jcornaz example without coroutines and original one work similarly.
Most helpful comment
@fabioCollini yes, I've made the fix for 1.3.3.
The current solution is to report undeliverable exceptions to
RxJavaPlugins, so @jcornaz example without coroutines and original one work similarly.