When using the zip operator on an Observable or Single along with a blocking call, there apparently isn't a way to propagate a single error consistently since mergeArrayDelayError() or similar all accept a single data type.
In this example, I have two Single, executed in parallel on a different thread, which are then passed into a zip function and then synchronously executed through a blockingGet().
What I'd expect is that the exception is thrown by this latter method. What happens though is that the exception is passed to the default uncaught exception handler from RxJavaPlugins.onError().
Am I using the wrong approach or is there an issue with error propagation here?
RxJava 2.1.3 on Android.
Here's the (failing) JUnit test:
@Test(expected = InterruptedException.class)
public void testParallelZip_exceptionNotPropagated() throws Exception {
Single<Object> allPeopleSource = Single.fromCallable(() -> {
throw new InterruptedException();
}).subscribeOn(Schedulers.io());
Single<String> idsSource = Single.fromCallable(() -> "second_single").subscribeOn(Schedulers.io());
Single.zip(allPeopleSource, idsSource, (o, s) -> "result").blockingGet();
}
And here's the failure crash log:
java.lang.Exception: Unexpected exception, expected<java.lang.InterruptedException> but was<java.lang.RuntimeException>
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:28)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:91)
at io.reactivex.Single.blockingGet(Single.java:2154)
at [...].RxUtilsTest.testParallelZip_exceptionNotPropagated(RxUtilsTest.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
... 20 more
Caused by: java.lang.InterruptedException
at com.teamwork.data.util.RxUtilsTest.lambda$testParallelZip_exceptionNotPropagated$1(RxUtilsTest.java:66)
at io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:35)
at io.reactivex.Single.subscribe(Single.java:2700)
at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
InterruptedException is a checked exception and we opted for blockingGet to not declare throws Exception in its signature. Therefore, it has to wrap any checked exception into a RuntimeException, which you received. You have to unwrap the inner InterruptedException which JUnit doesn't let you define via its expected = annotation afaik. Instead, you have to try-catch manually:
@Test
public void testParallelZip_exceptionNotPropagated() throws Exception {
Single<Object> allPeopleSource = Single.fromCallable(() -> {
throw new InterruptedException();
}).subscribeOn(Schedulers.io());
Single<String> idsSource = Single.fromCallable(() -> "second_single")
.subscribeOn(Schedulers.io());
try {
Single.zip(allPeopleSource, idsSource, (o, s) -> "result").blockingGet();
fail("Should have thrown!");
} catch (AssertionError ex) {
throw ex;
} catch (RuntimeException ex) {
assertTrue(ex + "", ex.getCause() instanceof InterruptedException);
}
}
@akarnokd apologies, I've changed my implementation from Observable and Single and that changed the result entirely.
Take this as an example of my scenario, but I haven't been able to reproduce the issue in a unit test:
Observable<Object> source1 = Observable.fromCallable(() -> {
throw new InterruptedException();
}).subscribeOn(Schedulers.io());
Observable<String> source2 = Observable.fromCallable(() -> "second_observable")
.subscribeOn(Schedulers.io());
Observable.zip(source1, source2, (o, s) -> "result").blockingSingle();
The exception I get is originated from
io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:48)
which means that the Observer seems to have been already disposed (from ObservableFromCallable, in my case RxJavaPlugins.onError(e) is executed):
if (!d.isDisposed()) {
s.onError(e);
} else {
RxJavaPlugins.onError(e);
}
The two source observables are obviously not shared among threads and they're just local variables. Can you think of any cases where this would happen?
No. If it went through the RxJavaPlugins.onError, you'd see the UndeliverableException in the stacktrace. Given that only one of the sources throw, that path can't be taken as there is nothing to dispose the chain. So either you misunderstood something or you have a different flow setup than in the top post.
@akarnokd I do see the UndeliverableException in the crash logs. The log I put earlier on referred to the other case, apologies again for confusing you.
E/UncaughtException: io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:48)
at io.reactivex.Observable.subscribe(Observable.java:10838)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:152)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:265)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
at java.lang.Thread.run(Thread.java:841)
Caused by: java.lang.InterruptedException (...)
So what remains, @marcosalis, that you post the code that causes that exception so I can reproduce the issue.
@akarnokd, unfortunately I can't reproduce the issue outside of the production code of the app I'm working on. What seems to be happening is that there is some kind of weird interaction with the RxJava scheduler threads, their interrupted state and the fact that the Observable seems to be disposed right after it's created. I will need to investigate this further, in the meantime I'm closing the issue as this doesn't seem to happen when using Single instead of Observable (I guess because of the different implementation of SingleFromCallable.subscribeActual, which never calls to RxJavaPlugins.onError).
Most helpful comment
@akarnokd, unfortunately I can't reproduce the issue outside of the production code of the app I'm working on. What seems to be happening is that there is some kind of weird interaction with the RxJava scheduler threads, their interrupted state and the fact that the Observable seems to be disposed right after it's created. I will need to investigate this further, in the meantime I'm closing the issue as this doesn't seem to happen when using
Singleinstead ofObservable(I guess because of the different implementation ofSingleFromCallable.subscribeActual, which never calls toRxJavaPlugins.onError).