Rxjava: 2.x: .zipWith() with the same shared source crashes the app

Created on 6 Sep 2017  路  10Comments  路  Source: ReactiveX/RxJava

The following code crashes the app with a IllegalStateException (using version 2.1.3):

        Observable myObservable = Observable.just(1)
            .flatMap(i -> {
                throw new IllegalStateException();
            }).share();

        myObservable
            .zipWith(myObservable, Pair::new)
            .subscribe(pair -> {
                //ignore
            }, throwable -> {
                //ignore
            });
 ```
If I comment the `.zipWith(myObservable, Pair::new)` line, then everything works. Is that the expected behavior?

In trying to investigate the issue, the `PublishObserver` object on `onError(Throwable e)` loops through 2 inner disposables, both containing the same instance of `ZipCoordinator`. 

The first time the publish observer propagates the error to `inner.child.onError(e)`, the zip coordinator sets itself to 0 on `innerError(...)`:
    void innerError(Throwable ex, int index) {
        if (getAndSet(0) > 0) {
            disposeExcept(index);
            actual.onError(ex);
        } else {
            RxJavaPlugins.onError(ex);
        }
    }

`` But sinceinner.child.onError(e)is called again, it gets propagated toRxJavaPlugins.onError(ex);` and crashes the app. It seems to me since this is a shared source, that the error should be propagated only once, no?

2.x Question

Most helpful comment

The original reactive concept defines max one error per Observer and we can't do much about that

I agree but then the error should be propagated down from the .zipWith() like I tried here. As it is in my original example, I have "one" observer but RxJava is trying to propagate the same error twice.

All 10 comments

share() doesn't work here as you expect because the first time it subscribes to its source, the process fails and the reference count goes back to zero. Then zipWith subscribes to it again, it triggers another execution and failure, but this second time, the error can't be signalled to the downstream and ends up in the global error handler.

The fact that ObservableZip subscribes to the sources whether or not the operator has reached its terminal state is debatable - many other operators stop subscribing to their set of sources once a terminal state has been reached during the subscription process.

Thanks for the quick response.

But then I'm not sure how to address this though. It crashes even if I use a onError... function like:

        myObservable //
            .zipWith(myObservable.onErrorReturnItem(2), Pair::new) //
            .subscribe(o -> {
                //ignore
            }, throwable -> {
                //ignore
            });

For the general case, please read the error handling section of the wiki.

Btw, you have mixed up the types in your example and post. You mentioned Single but showed an example with Observable.

Thanks and sorry for the confusion. But the problem is still present unless I'm missing something: If I use RxJavaPlugins.setErrorHandler() I'm not sure how to ignore the exceptions in the case at hand because it's the same exception (one that I care about and handle correctly in my observer the first time) being thrown a second time.

edit to add: the exception is _not_ even wrapped in an UndeliverableException.

By setting the error handler, it is up to the error handler what it does with the exception. There is an example in the wiki for ignoring certain exceptions and crashing on others.

OK, thanks a lot for the explanations. I'm not conveying the problem properly or I'm not understanding the .zipWith and the error handler approach properly. But I will figure something out.

Try publish().autoConnect(2) instead of share().

That works, thanks. But I can't use this solution because I have a generic source that I use in different places where any number of observers can connect to. It really needs to work with refCount().
Alternative solutions:

  • I could catch the error down the chain:
     myObservable
            .zipWith(myObservable, Pair::new)
            .onErrorReturn...

but that will force me to mind the places where this behavior with zipWith() takes place. It also forces the error logic to be moved to onNext() for those cases because I would need to wrap the error in a valid "result-like" object.

  • I could use RxJavaPlugins.setErrorHandler() to not crash on all errors since I know in my app all of them are being handled by an observer. But this is hiding the problem and might create side effects later.

None of these two options seem good enough.

The original reactive concept defines max one error per Observer and we can't do much about that: we either drop it, send it to some handler or the user doesn't let an exception bubble back into RxJava. That's what io.reactivex.Notification<T> should allow you: turning an error into an item.

The original reactive concept defines max one error per Observer and we can't do much about that

I agree but then the error should be propagated down from the .zipWith() like I tried here. As it is in my original example, I have "one" observer but RxJava is trying to propagate the same error twice.

Was this page helpful?
0 / 5 - 0 ratings