compile "io.reactivex.rxjava2:rxjava:2.1.0"
https://github.com/originx/rxjava2disposenpe
just run ./gradlew run
7aa3b598599ce9674817ae9a2396ac0aca7a28f7
f60076a837d9cdc3cb66e00b9178385b82b53692
I tried to get as close but barebone to the stream that we have in our project and stripped it down in a sample project.
Technically the project tries to fetch something from the network (but I made the http client fail early so it produces an exception).
Exception is propagated to the subscriber in the onError stream
Exception is propagated, but dispose is called 2x, and second time we call it the disposable is null and this creates a ChainedException with NPE.
We use Single -> lift (with DisposableSingleObserver) -> toObservable and we subscribe to observable, commit with problem 7aa3b598599ce9674817ae9a2396ac0aca7a28f7
If we switch everything to singles there is no "double dispose" and the error handling works as expected
Commit that works:
f60076a837d9cdc3cb66e00b9178385b82b53692
Let me know if you need more info.
Cold you provide the stacktrace of the NPE? That would point us right at the bug...
yes im sorry I missed that one:
Exception in thread "main" io.reactivex.exceptions.CompositeException: 2 exceptions occurred.
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.Observable.subscribe(Observable.java:10828)
at io.reactivex.Observable.subscribe(Observable.java:10757)
at bug.Main.main(Main.java:16)
ComposedException 1 :
java.net.SocketException: 324.234.123.123
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:435)
at java.net.Socket.connect(Socket.java:589)
at okhttp3.internal.platform.Platform.connectSocket(Platform.java:124)
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:220)
at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:146)
at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:186)
at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
at okhttp3.internal.connectiotion.newStream(StreamAllocation.java:100)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:179)
at okhttp3.RealCall.execute(RealCall.java:63)
at retrofit2.OkHttpCall.execute(OkHttpCall.java:174)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:41)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.obsableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.Observable.subscribe(Observable.java:10828)
at io.reactivex.Observable.subscribe(Observable.java:10757)
at bug.Main.main(Main.java:16)
ComposedException 2 :
java.lang.NullPointerException
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.disposeAll(ObservableFlatMap.java:503)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:492)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:288)
at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
at bug.rx.SequenceOperators$HttpErrorOperator$1.onError(SequenceOperators.java:52)
at io.reactivex.internal.operators.observable.ObservableSingleSingle$SingleElementObserver.onError(ObservableSingleSingle.java:95)
at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleLift.subscribeActual(SingleLift.java:44)
at io.reactivex.Single.subscribe(Single.java:2703)
at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:34)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reacti.subscribe(Observable.java:10842)
at io.reactivex.internal.operators.observable.ObservableFlatMap.subscribeActual(ObservableFlatMap.java:55)
at io.reactivex.Observable.subscribe(Observable.java:10842)
at io.reactivex.Observable.subscribe(Observable.java:10828)
at io.reactivex.Observable.subscribe(Observable.java:10757)
at bug.Main.main(Main.java:16)
This is bug stacktrace on pure java, on Android is similar maybe even more specific in pinpointing it:
Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void io.reactivex.disposables.Disposable.dispose()' on a null object reference
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.disposeAll(ObservableFlatMap.java:498)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.checkTerminate(ObservableFlatMap.java:490)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drainLoop(ObservableFlatMap.java:331)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.drain(ObservableFlatMap.java:323)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onError(ObservableFlatMap.java:288)
at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onError(SingleToObservable.java:65)
at io.reactivex.internal.operators.single.SingleMap$MapSingleObserver.onError(SingleMap.java:68)
at de.payback.core.reactive.SequenceOperators$HttpErrorOperator$1.onError(SequenceOperators.java:61)
at io.reactivex.internal.operators.single.SingleObserveOn$ObserveOnSingleObserver.run(SingleObserveOn.java:79)
at io.reactivex.Scheduler$1.run(Scheduler.java:138)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:762)
Okay. There is no bug in RxJava but in your lifted operator which should call onSubscribe(Disposable) on the downstream when it gets called by onSubscribe from its upstream. For example, override onStart:
@Override
protected void onStart() {
observer.onSubscribe(this);
}
Interesting I have to reread again on the operators again then.
thank you for the clarification.
Most helpful comment
Okay. There is no bug in RxJava but in your lifted operator which should call
onSubscribe(Disposable)on the downstream when it gets called byonSubscribefrom its upstream. For example, overrideonStart: