Rxjava: 2.x Testing a Single unexpectedly results in an IllegalStateException with message: "onSubscribe not called in proper order"

Created on 12 May 2017  路  13Comments  路  Source: ReactiveX/RxJava

RxJava 2.0.3.

Sample code with the problem:


@Test public void test() throws Exception {
        mockNoNetwork(); // will result in an exception with NO_NETWORK error code as shown below

        service.getSingle()
                .compose(ApiHelper.checkForNetwork())
                .test()
                .assertError(throwable -> {
                    if (throwable instanceof ApiException) {
                        int errorCode = ((ApiException) throwable).getErrorCode();
                        return errorCode == NO_NETWORK;
                    }
                    return false;
                });
}

// ApiHelper:
    public static <T> SingleTransformer<Result<T>, T> checkForNetwork() {
        return upstream ->
                upstream.delaySubscription(isNetworkAvailable())
                        .observeOn(ioScheduler) // in tests, changed to Trampoline
                        .map(checkForErrors());
    }

    private static CompletableSource isNetworkAvailable() {
        return subscriber -> {
            if (isNetworkUnavailable()) { // this is true in test
                subscriber.onError(ApiException.fromErrorCode(NO_NETWORK));
            } else {
                subscriber.onComplete();
            }
        };
    }

I'm getting the exception I want, but there is an additional exception thrown which seems to be complaining about a failure to subscribe to the Single (and this fails the test). What am I missing?

Here is the stack trace:

java.lang.AssertionError: Error present but other errors as well (latch = 0, values = 0, errors = 2, completions = 0)

    at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:133)
    at io.reactivex.observers.BaseTestConsumer.assertError(BaseTestConsumer.java:278)
    at com.myapp.backend.retrofit.membership.ServiceTest.test(ServiceTest.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: io.reactivex.exceptions.CompositeException: 2 exceptions occurred. 
    at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:138)
    ... 31 more
Caused by: io.reactivex.exceptions.CompositeException$CompositeExceptionCausalChain: Chain of Causes for CompositeException In Order Received =>
    at io.reactivex.exceptions.CompositeException.getCause(CompositeException.java:105)
    at java.lang.Throwable.printEnclosedStackTrace(Throwable.java:707)
    at java.lang.Throwable.printStackTrace(Throwable.java:667)
    at java.lang.Throwable.printStackTrace(Throwable.java:721)
    at org.junit.runner.notification.Failure.getTrace(Failure.java:75)
    at com.intellij.junit4.JUnit4TestListener.getTrace(JUnit4TestListener.java:297)
    at com.intellij.junit4.JUnit4TestListener.testFailure(JUnit4TestListener.java:279)
    at com.intellij.junit4.JUnit4TestListener.testFailure(JUnit4TestListener.java:231)
    at com.intellij.junit4.JUnit4TestListener.testFailure(JUnit4TestListener.java:207)
    at org.junit.runner.notification.SynchronizedRunListener.testFailure(SynchronizedRunListener.java:63)
    at org.junit.runner.notification.RunNotifier$4.notifyListener(RunNotifier.java:142)
    at org.junit.runner.notification.RunNotifier$SafeNotifier.run(RunNotifier.java:72)
    at org.junit.runner.notification.RunNotifier.fireTestFailures(RunNotifier.java:138)
    at org.junit.runner.notification.RunNotifier.fireTestFailure(RunNotifier.java:132)
    at org.junit.internal.runners.model.EachTestNotifier.addFailure(EachTestNotifier.java:23)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:329)
    ... 18 more
Caused by: java.lang.IllegalStateException: onSubscribe not called in proper order
    at io.reactivex.observers.TestObserver.onError(TestObserver.java:163)
    at io.reactivex.internal.operators.single.SingleMap$1.onError(SingleMap.java:55)
    at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:79)
    at io.reactivex.internal.schedulers.TrampolineScheduler.scheduleDirect(TrampolineScheduler.java:49)
    at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.onError(SingleObserveOn.java:71)
    at io.reactivex.internal.operators.single.SingleDelayWithCompletable$OtherObserver.onError(SingleDelayWithCompletable.java:65)
    at com.myapp.backend.retrofit.ApiHelper.lambda$checkForNetwork$2(ApiHelper.java:77)
    at io.reactivex.internal.operators.single.SingleDelayWithCompletable.subscribeActual(SingleDelayWithCompletable.java:36)
    at io.reactivex.Single.subscribe(Single.java:2656)
    at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
    at io.reactivex.Single.subscribe(Single.java:2656)
    at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)
    at io.reactivex.Single.subscribe(Single.java:2656)
    at io.reactivex.Single.test(Single.java:3067)
    at com.myapp.backend.retrofit.membership.ServiceTest.test(ServiceTest.java:60)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    ... 18 more
Caused by: com.myapp.backend.retrofit.ApiException: Code: -4
    at com.myapp.backend.retrofit.ApiException.fromErrorCode(ApiException.java:15)
    ... 38 more
2.x Question

Most helpful comment

Okay, right, your custom CompletableSource implementation is wrong:

private static CompletableSource isNetworkAvailable() {
        return subscriber -> {
            if (isNetworkUnavailable()) { // this is true in test
                subscriber.onError(ApiException.fromErrorCode(NO_NETWORK));
            } else {
                subscriber.onComplete();
            }
        };
    }

When creating sources this way (which is not recommended), you are responsible for following the protocol. Please use Completable.create() instead.

All 13 comments

Please try with the current RxJava 2.1.0 version.

Okay, right, your custom CompletableSource implementation is wrong:

private static CompletableSource isNetworkAvailable() {
        return subscriber -> {
            if (isNetworkUnavailable()) { // this is true in test
                subscriber.onError(ApiException.fromErrorCode(NO_NETWORK));
            } else {
                subscriber.onComplete();
            }
        };
    }

When creating sources this way (which is not recommended), you are responsible for following the protocol. Please use Completable.create() instead.

That did it! Changing the CompletableSource implementation to the following results in a passing test:

        return Completable.create(emitter -> {
            if (isNetworkUnavailable()) {
                emitter.onError(ApiException.fromErrorCode(NO_NETWORK));
            } else {
                emitter.onComplete();
            }
        });

Thank you!

Hm, maybe Single.delaySubscription() isn't what I want.

I have been using Flowables with startWith() for this same check, and that works fine. But semantically Single makes more sense, hence my current refactoring efforts. However, when my CompletableSource calls emitter.onError(), my app crashes with UndeliverableException. This is actually what I initially expected given the javadoc for delaySubscription(), which states:

If the delaying source signals an error, that error is re-emitted and no subscription to the current Single happens.

Which operator ought I to be using? Thanks for your time.

Since this is becoming more like a usage question, I have posted this to Stack Overflow: http://stackoverflow.com/q/43988752/2740621

CompletableSource calls emitter.onError(), my app crashes with UndeliverableException

What is the stacktrace of the crash?

This is an example I achieved by setting my device to airplane mode before navigating to a particular screen.

05-15 17:11:19.025 8777-8777/com.myapp E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.myapp, PID: 8777
io.reactivex.exceptions.UndeliverableException: com.myapp.backend.retrofit.ApiException: Code: -4
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
    at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onError(CompletableCreate.java:92)
    at com.myapp.backend.retrofit.ApiHelper.lambda$checkForNetworkSingle$2(ApiHelper.java:86)
    at com.myapp.backend.retrofit.ApiHelper.access$lambda$2(ApiHelper.java)
    at com.myapp.backend.retrofit.ApiHelper$$Lambda$3.subscribe(Unknown Source)
    at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)
    at io.reactivex.Completable.subscribe(Completable.java:1635)
    at io.reactivex.internal.operators.single.SingleDelayWithCompletable.subscribeActual(SingleDelayWithCompletable.java:36)
    at io.reactivex.Single.subscribe(Single.java:2703)
    at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
    at io.reactivex.Single.subscribe(Single.java:2703)
    at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)
    at io.reactivex.Single.subscribe(Single.java:2703)
    at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)
    at io.reactivex.Single.subscribe(Single.java:2703)
    at io.reactivex.internal.operators.single.SingleZipArray.subscribeActual(SingleZipArray.java:64)
    at io.reactivex.Single.subscribe(Single.java:2703)
    at io.reactivex.Single.subscribe(Single.java:2689)
    at com.myapp.mvp.upgrade.UpgradeModel.retrieveLicenseKeyAndDeveloperPayload(UpgradeModel.java:156)
    at com.myapp.mvp.upgrade.UpgradeModel.initializeBilling(UpgradeModel.java:147)
    at com.myapp.mvp.upgrade.UpgradePresenter.initializeBilling(UpgradePresenter.java:73)
    at com.myapp.mvp.upgrade.UpgradePresenter.attachView(UpgradePresenter.java:58)
    at com.myapp.mvp.upgrade.UpgradePresenter.attachView(UpgradePresenter.java:26)
    at com.myapp.mvp.upgrade.UpgradeFragment.onResume(UpgradeFragment.java:101)
    at android.support.v4.app.Fragment.performResume(Fragment.java:2238)
    at android.support.v4.app.FragmentManagerImpl.moveToState(FragmentManager.java:1346)
    at android.support.v4.app.FragmentManagerImpl.moveFragmentToExpectedState(FragmentManager.java:1528)
    at android.support.v4.app.FragmentManagerImpl.moveToState(FragmentManager.java:1595)
    at android.support.v4.app.BackStackRecord.executeOps(BackStackRecord.java:758)
    at android.support.v4.app.FragmentManagerImpl.executeOps(FragmentManager.java:2363)
    at android.support.v4.app.FragmentManagerImpl.executeOpsTogether(FragmentManager.java:2149)
    at android.support.v4.app.FragmentManagerImpl.optimizeAndExecuteOps(FragmentManager.java:2103)
    at android.support.v4.app.FragmentManagerImpl.execPendingActions(FragmentManager.java:2013)
    at android.support.v4.app.FragmentManagerImpl$1.run(FragmentManager.java:710)
    at android.os.Handler.handleCallback(Handler.java:751)
    at android.os.Handler.dispatchMessage(Handler.java:95)
    at android.os.Looper.loop(Looper.java:154)
    at android.app.ActivityThread.main(ActivityThread.java:6121)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:889)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:779)
 Caused by: com.myapp.backend.retrofit.ApiException: Code: -4
    at com.myapp.backend.retrofit.ApiHelper.lambda$checkForNetworkSingle$2(ApiHelper.java:86)聽
    at com.myapp.backend.retrofit.ApiHelper.access$lambda$2(ApiHelper.java)聽
    at com.myapp.backend.retrofit.ApiHelper$$Lambda$3.subscribe(Unknown Source)聽
    at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)聽
    at io.reactivex.Completable.subscribe(Completable.java:1635)聽
    at io.reactivex.internal.operators.single.SingleDelayWithCompletable.subscribeActual(SingleDelayWithCompletable.java:36)聽
    at io.reactivex.Single.subscribe(Single.java:2703)聽
    at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)聽
    at io.reactivex.Single.subscribe(Single.java:2703)聽
    at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)聽
    at io.reactivex.Single.subscribe(Single.java:2703)聽
    at io.reactivex.internal.operators.single.SingleMap.subscribeActual(SingleMap.java:33)聽
    at io.reactivex.Single.subscribe(Single.java:2703)聽
    at io.reactivex.internal.operators.single.SingleZipArray.subscribeActual(SingleZipArray.java:64)聽
    at io.reactivex.Single.subscribe(Single.java:2703)聽
    at io.reactivex.Single.subscribe(Single.java:2689)聽
    at com.myapp.mvp.upgrade.UpgradeModel.retrieveLicenseKeyAndDeveloperPayload(UpgradeModel.java:156)聽
    at com.myapp.mvp.upgrade.UpgradeModel.initializeBilling(UpgradeModel.java:147)聽
    at com.myapp.mvp.upgrade.UpgradePresenter.initializeBilling(UpgradePresenter.java:73)聽
    at com.myapp.mvp.upgrade.UpgradePresenter.attachView(UpgradePresenter.java:58)聽
    at com.myapp.mvp.upgrade.UpgradePresenter.attachView(UpgradePresenter.java:26)聽
    at com.myapp.mvp.upgrade.UpgradeFragment.onResume(UpgradeFragment.java:101)聽
    at android.support.v4.app.Fragment.performResume(Fragment.java:2238)聽
    at android.support.v4.app.FragmentManagerImpl.moveToState(FragmentManager.java:1346)聽
    at android.support.v4.app.FragmentManagerImpl.moveFragmentToExpectedState(FragmentManager.java:1528)聽
    at android.support.v4.app.FragmentManagerImpl.moveToState(FragmentManager.java:1595)聽
    at android.support.v4.app.BackStackRecord.executeOps(BackStackRecord.java:758)聽
    at android.support.v4.app.FragmentManagerImpl.executeOps(FragmentManager.java:2363)聽
    at android.support.v4.app.FragmentManagerImpl.executeOpsTogether(FragmentManager.java:2149)聽
    at android.support.v4.app.FragmentManagerImpl.optimizeAndExecuteOps(FragmentManager.java:2103)聽
    at android.support.v4.app.FragmentManagerImpl.execPendingActions(FragmentManager.java:2013)聽
    at android.support.v4.app.FragmentManagerImpl$1.run(FragmentManager.java:710)聽
    at android.os.Handler.handleCallback(Handler.java:751)聽
    at android.os.Handler.dispatchMessage(Handler.java:95)聽
    at android.os.Looper.loop(Looper.java:154)聽
    at android.app.ActivityThread.main(ActivityThread.java:6121)聽
    at java.lang.reflect.Method.invoke(Native Method)聽
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:889)聽
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:779)聽

Looks like your stream gets cancelled immediately hence the undeliverable error. This test passes with 2.1:

@Test
public void completableCustomErrors() {
    List<Throwable> errors = TestHelper.trackPluginErrors();

    try {

        Single.just(1)
        .delaySubscription(Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                if (!e.isDisposed()) {
                    e.onError(new TestException());
                }
            }
        }))
        .test()
        .assertFailure(TestException.class);

        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}

Try with !e.isDisposed() like above to see if you are having errors due to cancellation. If so, consider adding a global error handler as described in the wiki

Thanks for the reply. Btw, I am on 2.1.0.

I was also able to get that test (and similar) to pass.

After adding the check for !e.isDisposed(), I still experienced the crash in my Android app. So, I kept digging. I realized I did not tell you what is certainly a critical piece of information (in retrospect), which is that I'm using Single.zip() to observe two streams in parallel. However, when I altered the test to use zip(), it still passed. (Test pasted below for completeness.)

Then I noticed that my log output included the log that would be spit out by my error-handler -- so I _knew_ it must be hitting it. So, I tried debugging to see why it was calling RxJavaPlugins.onError(t);. When debugging, no crash. I.e., race condition. Now I'm not sure if the error is in my code or a bug in RxJava. Note that, in prod, I'm observing the stream on Schedulers.io().

Just now, in my CompletableSource, I added some logging (so I don't have to debug) to notify me if the emitter has been disposed. BOTH sources think it is not disposed. Clearly one wins.

Critical question:
I don't want to add the global error handler for this case because it feels like papering over a problem. Is there some way to synchronize my code such that the call to e.isDisposed() is not subject to race conditions?

@Test
public void testWouldFailInProd() throws Exception {
    // TestHelper is not available, but fortunately the source code is!
    List<Throwable> errors = Collections.synchronizedList(new LinkedList<>());
    RxJavaPlugins.setErrorHandler(errors::add);

    try {
        Single<Integer> s1 = Single.just(1)
                // I'm using `compose()` to more accurately reflect my prod code
                .compose(stream ->
                        stream.delaySubscription(Completable.create(e -> {
                            // in prod, both this and the other call resolve to `true`
                            if (!e.isDisposed()) {
                                e.onError(new TestException("haha"));
                            }
                        })));

        Single<Integer> s2 = Single.just(2)
                .compose(stream ->
                        stream.delaySubscription(Completable.create(e -> {
                            // in prod, both this and the other call resolve to `true`
                            if (!e.isDisposed()) {
                                e.onError(new TestException("haha"));
                            }
                        })));

        Single.zip(s1, s2, (i, j) -> true)
                .test()
                .assertFailure(TestException.class);

        assertTrue(errors.toString(), errors.isEmpty());
    } finally {
        RxJavaPlugins.reset();
    }
}

private static class TestException extends Exception {
    TestException(String msg) {
        super(msg);
    }
}

I don't want to add the global error handler for this case because it feels like papering over a problem. Is there some way to synchronize my code such that the call to e.isDisposed() is not subject to race conditions?

This is not possible.

I'm using Single.zip() to observe two streams in parallel

This is a critical information that can lead to extra errors because only one error can win. There is no delayError option for Single but only Observable/Flowable so you either convert your flows back or factor out this network check before zipping the sources.

My apologies for leaving that out earlier. Thanks for your help. I'll take your advice and see if I can either factor out the network check or simply use/convert to Observable/Flowable instead.

Just to close the loop on this (sorry for the email spam), factoring out the network check to the Single.zip() level works perfectly.

Was this page helpful?
0 / 5 - 0 ratings