Rxjava: NPE double dispose called Single lift operator and toObservable

Created on 11 May 2017  路  4Comments  路  Source: ReactiveX/RxJava

RxJava version used:

compile "io.reactivex.rxjava2:rxjava:2.1.0"

Reproduction project:

https://github.com/originx/rxjava2disposenpe

just run ./gradlew run

Commit with problem:

7aa3b598599ce9674817ae9a2396ac0aca7a28f7

Commit that works:

f60076a837d9cdc3cb66e00b9178385b82b53692

I tried to get as close but barebone to the stream that we have in our project and stripped it down in a sample project.

Technically the project tries to fetch something from the network (but I made the http client fail early so it produces an exception).

Expected:

Exception is propagated to the subscriber in the onError stream

Actual:

Exception is propagated, but dispose is called 2x, and second time we call it the disposable is null and this creates a ChainedException with NPE.

Remarks:

We use Single -> lift (with DisposableSingleObserver) -> toObservable and we subscribe to observable, commit with problem 7aa3b598599ce9674817ae9a2396ac0aca7a28f7

If we switch everything to singles there is no "double dispose" and the error handling works as expected
Commit that works:
f60076a837d9cdc3cb66e00b9178385b82b53692

Let me know if you need more info.

2.x Question

Most helpful comment

Okay. There is no bug in RxJava but in your lifted operator which should call onSubscribe(Disposable) on the downstream when it gets called by onSubscribe from its upstream. For example, override onStart:

@Override
protected void onStart() {
    observer.onSubscribe(this);
}

All 4 comments

Cold you provide the stacktrace of the NPE? That would point us right at the bug...

yes im sorry I missed that one:

Exception in thread "main" io.reactivex.exceptions.CompositeException: 2 exceptions occurred. 
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:58)
        at io.reactivex.Observable.subscribe(Observable.java:10842)
        at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
        at io.reactivex.Single.subscribe(Single.java:2703)
        at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
        at io.reactivex.Single.subscribe(Single.java:2703)
        at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
        at io.reactivex.Observable.subscribe(Observable.java:10842)
        at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
        at io.reactivex.Observable.subscribe(Observable.java:10842)
        at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
        at io.reactivex.Observable.subscribe(Observable.java:10842)
        at io.reactivex.Observable.subscribe(Observable.java:10828)
        at io.reactivex.Observable.subscribe(Observable.java:10757)
        at bug.Main.main(Main.java:16)
  ComposedException 1 :
        java.net.SocketException: 324.234.123.123
                at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:435)
                at java.net.Socket.connect(Socket.java:589)
                at okhttp3.internal.platform.Platform.connectSocket(Platform.java:124)
                at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:220)
                at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:146)
                at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:186)
                at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
                at okhttp3.internal.connectiotion.newStream(StreamAllocation.java:100)
                at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
                at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
                at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:179)
                at okhttp3.RealCall.execute(RealCall.java:63)
                at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
                at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
                at io.reactivex.Single.subscribe(Single.java:2703)
                at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
                at io.reactivex.Single.subscribe(Single.java:2703)
                at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.obsableFlatMap.subscribeActual(ObservableFlatMap.java:55)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.Observable.subscribe(Observable.java:10828)
                at io.reactivex.Observable.subscribe(Observable.java:10757)
                at bug.Main.main(Main.java:16)
  ComposedException 2 :
        java.lang.NullPointerException
                at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.disposeAll(ObservableFlatMap.java:503)
                at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:492)
                at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
                at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
                at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:288)
                at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
                at bug.rx.SequenceOperators$HttpErrorOperator$1.onError(SequenceOperators.java:52)
                at io.reactivex.internal.operators.observable.ObservableSingleSingle$SingleElementObserver.onError(ObservableSingleSingle.java:95)
                at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:55)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
                at io.reactivex.Single.subscribe(Single.java:2703)
                at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
                at io.reactivex.Single.subscribe(Single.java:2703)
                at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
                at io.reacti.subscribe(Observable.java:10842)
                at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
                at io.reactivex.Observable.subscribe(Observable.java:10842)
                at io.reactivex.Observable.subscribe(Observable.java:10828)
                at io.reactivex.Observable.subscribe(Observable.java:10757)
                at bug.Main.main(Main.java:16)

This is bug stacktrace on pure java, on Android is similar maybe even more specific in pinpointing it:


Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void io.reactivex.disposables.Disposable.dispose()' on a null object reference
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.disposeAll(ObservableFlatMap.java:498)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:490)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
       at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:288)
       at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
       at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onError(SingleMap.java:68)
       at de.payback.core.reactive.SequenceOperators$HttpErrorOperator$1.onError(SequenceOperators.java:61)
       at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:79)
       at io.reactivex.Scheduler$1.run(Scheduler.java:138)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
       at java.lang.Thread.run(Thread.java:762)

Okay. There is no bug in RxJava but in your lifted operator which should call onSubscribe(Disposable) on the downstream when it gets called by onSubscribe from its upstream. For example, override onStart:

@Override
protected void onStart() {
    observer.onSubscribe(this);
}

Interesting I have to reread again on the operators again then.

thank you for the clarification.

Was this page helpful?
0 / 5 - 0 ratings