Rxjava: onError method not called

Created on 4 Jun 2019  路  2Comments  路  Source: ReactiveX/RxJava

simple code like this, just test exception at onNext method.
onError method not called and no any crash message.


 Observable.merge(Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);

                emitter.onComplete();
            }
        }), Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();

            }
        }))
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("print", "onNext-" + integer);
                        throw new RuntimeException("Test onNext exception");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d("print", "onError-" + e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d("print", "onComplete");
                    }
                });
2.x Question

Most helpful comment

Observer.onNext must not throw an exception. When you implement Observer directly, you have to manage exceptions inside the onXXX methods yourself. The Javadoc describes this case:

The implementations of the onXXX methods should avoid throwing runtime exceptions other than the following cases (see Rule 2.13 of the Reactive Streams specification):

If the argument is null, the methods can throw a NullPointerException. Note though that RxJava prevents nulls to enter into the flow and thus there is generally no need to check for nulls in flows assembled from standard sources and intermediate operators.
If there is a fatal error (such as VirtualMachineError).
Violating Rule 2.13 results in undefined flow behavior. Generally, the following can happen:

  • An upstream operator turns it into an onError(java.lang.Throwable) call.
  • If the flow is synchronous, the ObservableSource.subscribe(Observer) throws instead of returning normally.
  • If the flow is asynchronous, the exception propagates up to the component (Scheduler or Executor) providing the asynchronous boundary the code is running and either routes the exception to the global RxJavaPlugins.onError(Throwable) handler or the current thread's Thread.UncaughtExceptionHandler.uncaughtException(Thread, Throwable) handler.

From the Observable's perspective, an Observer is the end consumer thus it is the Observer's responsibility to handle the error case and signal it "further down". This means unreliable code in the onXXX methods should be wrapped into try-catches, specifically in onError(Throwable) or onComplete(), and handled there (for example, by logging it or presenting the user with an error dialog). However, if the error would be thrown from onNext(Object), Rule 2.13 mandates the implementation calls Disposable.dispose() and signals the exception in a way that is adequate to the target context, for example, by calling onError(Throwable) on the same Observer instance.
If, for some reason, the Observer won't follow Rule 2.13, the Observable.safeSubscribe(Observer) can wrap it with the necessary safeguards and route exceptions thrown from onNext into onError and route exceptions thrown from onError and onComplete into the global error handler via RxJavaPlugins.onError(Throwable).

So either use try-catch and call onError yourself or use the lambda-subscribe() method which does capture and relay exceptions thrown by the lambda functions.

All 2 comments

Observer.onNext must not throw an exception. When you implement Observer directly, you have to manage exceptions inside the onXXX methods yourself. The Javadoc describes this case:

The implementations of the onXXX methods should avoid throwing runtime exceptions other than the following cases (see Rule 2.13 of the Reactive Streams specification):

If the argument is null, the methods can throw a NullPointerException. Note though that RxJava prevents nulls to enter into the flow and thus there is generally no need to check for nulls in flows assembled from standard sources and intermediate operators.
If there is a fatal error (such as VirtualMachineError).
Violating Rule 2.13 results in undefined flow behavior. Generally, the following can happen:

  • An upstream operator turns it into an onError(java.lang.Throwable) call.
  • If the flow is synchronous, the ObservableSource.subscribe(Observer) throws instead of returning normally.
  • If the flow is asynchronous, the exception propagates up to the component (Scheduler or Executor) providing the asynchronous boundary the code is running and either routes the exception to the global RxJavaPlugins.onError(Throwable) handler or the current thread's Thread.UncaughtExceptionHandler.uncaughtException(Thread, Throwable) handler.

From the Observable's perspective, an Observer is the end consumer thus it is the Observer's responsibility to handle the error case and signal it "further down". This means unreliable code in the onXXX methods should be wrapped into try-catches, specifically in onError(Throwable) or onComplete(), and handled there (for example, by logging it or presenting the user with an error dialog). However, if the error would be thrown from onNext(Object), Rule 2.13 mandates the implementation calls Disposable.dispose() and signals the exception in a way that is adequate to the target context, for example, by calling onError(Throwable) on the same Observer instance.
If, for some reason, the Observer won't follow Rule 2.13, the Observable.safeSubscribe(Observer) can wrap it with the necessary safeguards and route exceptions thrown from onNext into onError and route exceptions thrown from onError and onComplete into the global error handler via RxJavaPlugins.onError(Throwable).

So either use try-catch and call onError yourself or use the lambda-subscribe() method which does capture and relay exceptions thrown by the lambda functions.

thank you very much

Was this page helpful?
0 / 5 - 0 ratings