Rxjava: How to throw an exception from within an Observer

Created on 19 Mar 2014  Â·  29Comments  Â·  Source: ReactiveX/RxJava

Hi,

I am trying to make the app fail fast when something is going wrong from the Observer.
I was surprised to see that the Exception was swallowed by RxJava.

Here is a sample I tried :

 Observable.error(new Exception()).subscribe(new Observer<Object>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        throw new IllegalStateException("This should crash the app");
                    }

                    @Override
                    public void onNext(Object o) {

                    }
                });

Do I need do do anything specific to fail from within an Observer ?

Most helpful comment

I get why catching all errors from within the flow of RxJava (ie observables, functions etc..) to keep the monadic properties but once you reach the Observer the flow is consumed and this the point in the app where you can act on the events. The idea here is to fail fast in order to identify easily developer errors during the development phase. I was under the assumption after reading this closed issue https://github.com/Netflix/RxJava/issues/650 that RxJava should not be swallowing exceptions from within an Observer.

All 29 comments

According to Rx contract, onError will be called only once. So you need to handle the exception in onError by yourself.

I understand that onError is called only once, I'm not trying to forward the exception to something else. What I want to do is be able to throw a RuntimeException to make the app terminate.

RxJava will catch Exception and most Error. You can not propagate Exception out of RxJava. I'm curious why you have such requirement.

I get why catching all errors from within the flow of RxJava (ie observables, functions etc..) to keep the monadic properties but once you reach the Observer the flow is consumed and this the point in the app where you can act on the events. The idea here is to fail fast in order to identify easily developer errors during the development phase. I was under the assumption after reading this closed issue https://github.com/Netflix/RxJava/issues/650 that RxJava should not be swallowing exceptions from within an Observer.

650 is out of date. The Observer is not only used by users, but also by RxJava. Observer is also used to build operators. In my opinion, Observable is often asynchronous. It means Observer often runs in other thread instead of the main thread. Even if RxJava throws the RuntimeException, it only crash the thread. That's still hard to debug. I suppose that forcing users to handle the exception is a better idea.

I didn't realise that some operator might be using Observers internally, makes more sense now.
I am handling the exceptions properly, I just wanted to figure out a way to make unexpected failures resulting from developer errors to be easier to spot. I suppose I'll rely on visual indication and Logs for now.

This is an odd use case ... and it skips over the Exceptions.throwIfFatal() line in subscribe and swallows the error: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L7094

Not ideal at all.

I think we may need to make SafeSubscriber use an Exception other than RuntimeException that is considered "fatal" so we don't end up in loops where observer.onError throws and then subscribe tries to re-send it right back to observer.onError again only to get ignored by SafeSubscriber that allows only a single terminal state.

See here for the RuntimeException that should get thrown all the way up: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java#L171

Sorry for reviving an old discussion.

We also run into the same issue, and the change that went in does not really help us. Rx contract or not, I think I agree with @Dorvaryn that certain errors should simply terminate the app, since they can signal developer error rather than anticipated runtime errors.

A good example are precondition checks and assertions. Guava for instance throws IllegalArgumentException when a precondition failed. This will throw during an onNext call. Why would I want to handle this within the Rx call chain? As a developer I violated a method contract and should be informed immediately--by seeing the app crash. The same goes for null pointers.

The way this is dealt with right now:

  • a static Exceptions.throwIfFatal method which cannot be overridden, and which does not account for above mentioned cases
  • a plugin system for errors, but which is safe guarded against from re-throwing errors, so this is not an option either

While I do see your points, I think there is definitely a case to be made for failing early, outside the Rx contract (since you _want_ to terminate the app, why would breaking the Rx contract matter anyway?) It would go a long way to make Rx based apps easier to debug, since more often than not, errors you expect to fail the app simply disappear in the safe guards applied by SafeSubscriber.

Actually, a much simpler way to solve this would be to change throwIfFatal to always rethrow Errors, and not just the 4 specific ones it checks for? That way we could wrap e.g. argument checks in AssertionError and have it terminate early.

Was there any particular reason why only 4 kinds of errors are rethrown from throwIfFatal? There's other kinds of errors like NoClassDefFoundError that should probably be considered fatal as well. I always assumed anything derived from Error is considered fatal anyway.

Generally errors should be emitted to onError otherwise systems can end up hanging and/or leak resources because they never receive a terminal event (onComplete/onError) and therefore never releases user requests, unsubscribed for cleanup, etc. Only fatal errors are thrown, and the OnErrorNotImplementedException after the terminal event ensures everything receives an unsubscribe.

Ben Christensen
310.782.5511 @benjchristensen

On Jun 24, 2014, at 2:00 PM, Matthias Käppler [email protected] wrote:

Actually, a much simpler way to solve this would be to change throwIfFatal to always rethrow Errors, and not just the 4 specific ones it checks for? That way we could wrap exceptions in AssertionError and have it terminate early.

Was there any particular reason why only 4 kinds of errors are rethrown from throwIfFatal? There's other kinds of errors like NoClassDefFoundError that should probably be considered fatal as well. I always assumed anything derived from Error is considered fatal anyway.

—
Reply to this email directly or view it on GitHub.

Sure, but why is something like NoClassDefFound not considered fatal? According to Java's Error doc, all subclasses are considered fatal.

The definition of fatal exceptions can definitely be changed. Here is what we will want to change: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/exceptions/Exceptions.java#L82

Would you be okay with replacing these 4 checks with instanceof Error?
I'm not sure if this will have a larger impact, but I feel that's the right
thing to do here

Perhaps. Should AssertionError be considered fatal though? When we say fatal here it really does mean the app may die it need to be killed as user requests may be hung if this happens.

In our case, I feel an assertion failure should kill the app, but I see
that in different environments there might be different requirements around
such behavior. I was actually wondering if there is even a need to split up
Exceptions.throwIfFatal and the global error handler registered via
RxJavaPlugins. If we weaken the requirement of the global onError hook to
rethrow if desired, the logic from throwIfFatal could simply live in a
default onError hook, and would remain overridable by clients.

It also looks like a potential source of bugs due to duplication: maybe
it's by design, but the recently rewritten ExecutorScheduler does not
invoke throwIfFatal if it catches an error, only the global onError hook;
this makes for one half of the problem we're having, since it wraps all job
executions in a try/catch/finally block, so even if throwIfFatal would
rethrow say a StackOverflowError, the job would silently shut down and no
one ever sees that error. I guess it makes sense in terms of resilience,
but makes debugging hard, since all fatal errors disappear in a void.

The onError hook is only for logging and insights, not for decoration or throwing: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/plugins/RxJavaErrorHandler.java#L37 The plugin was specifically created to help identify when there is bad code swallowing errors in production environments and other similar diagnostic use cases. It was never intended for application logic. That's why it doesn't return anything and has a strict contract to never throw.

If ExecutorScheduler is swallowing fatal errors (primarily exceptions thrown from onError) then it is a bug.

The contract for error handling in Rx is:

1) All errors thrown by user provided functions or notifications to onNext and onCompleted must be passed to onError unless they are considered "fatal" (primarily to prevent OutOfMemory and StackoverFlow scenarios).
2) If onError itself throws an exception there is no choice but to consider it fatal.

A try/catch/finally isn't reliable when multiple threads are involved, this is why it's so important to route errors to onError and only throw if we have no other choice (i.e. onError itself throws) due to the non-determinism of what thread it will be thrown on and what resources it will leave hanging.

Can we step back and clearly define the use case trying to be solved? I want to understand if it's the actual Rx contract above that is in question, or just a bug in an operator or choice of "fatal" exceptions that needs to be fixed.

I see, that makes sense.

The use case we're trying to address is discovering developer failure
early. In this specific instance we had a precondition check fail on a
method argument, which was invoked inside an RxJava call chain. The
precondition check fails with IllegalArgumentException, which is a
RuntimeException, which in turn is not considered fatal by RxJava so gets
caught in SafeSubscriber's safety net (again, there is no reasonable way to
recover from that error in the app.)

The result is that our app continues to run in a broken or undefined state,
since we continue execution after a precondition was violated. This is hard
to debug and not obvious to the programmer. What we want is to crash the
app.

What does your onError handler do with the exception when it receives it?

It checks it for its type and then decides if it should rethrow or report
silently into our crash logging service. We rethrow all Errors and all
RuntimeExceptions. Checked exceptions are either logged (depends on the
type as well, we don't log IOExceptions for instance) or are simply
ignored, because they are expected to be handled by the subscriber in a way
appropriate to the context of the subscriber

Since your final onError throws, it would result in it being treated as "fatal" and allowed to throw all the way up the stack.

On the other hand, if a developer doesn't implement onError, you'd just get everything thrown wrapped in an OnErrorNotImplementedException, which seems to also results in behavior you'd like.

Considering these two possibilities, what about this current state isn't working for you?

I believe part of the problem is ExecutorScheduler. Its run method is:

        @Override
        public void run() {
            if (isUnsubscribed()) {
                return;
            }
            try {
                actual.call();
            } catch (Throwable t) {
                RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
            } finally {
                unsubscribe();
            }
        }

That is, it catches all errors and loops them through the global onError hook, which is not allowed to throw. So even if SafeSubscriber rethrows, it would just end up being swallowed again.

That said, just from my perspective, what we could do is to let the developer decide what fatal means when an exception is caught. This could be as simple as providing a predicate isFatal (e.g. through the plugin APIs) that SafeSubscriber calls instead of the static Exceptions.throwIfFatal method, and would rethrow on true.

Plus, we need to make sure that this logic is executed in all internal classes the catch Throwable. Otherwise we'll end up swallowing errors that shouldn't be swallowed.

Plus, we need to make sure that this logic is executed in all internal classes the catch Throwable. Otherwise we'll end up swallowing errors that shouldn't be swallowed.

Definitely.

let the developer decide what fatal means

If the user provided Subscriber.onError throws an exception, it will always be treated as fatal (since onError itself failed). Due to this I don't see why anything further is needed. The only real reason Exceptions.throwIfFatal really exists is to avoid recursive loops and stack overflow. Otherwise everything gets routed to onError or onError itself throws and the exception is allowed to throw all the way up the stack.

I believe part of the problem is ExecutorScheduler

Agreed. If scheduled work fails it has no choice but to throw an exception wherever it is, such as here:
https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/schedulers/ScheduledAction.java#L41

Work being scheduled should never throw, but instead propagate to onError, but if the action being scheduled doesn't correctly handled that, the Scheduler itself does not have a reference to a Subscriber and thus can do nothing but throw. No operator should ever behave that way though.

Work being scheduled should never throw, but instead propagate to onError, but if the action being scheduled doesn't correctly handled that, the Scheduler itself does not have a reference to a Subscriber and thus can do nothing but throw. No operator should ever behave that way though.

Makes sense. I'm not sure I understand what you're suggesting though. All our observables are scheduled, so whatever the subscriber does won't matter, as it will end up in this catch clause. Does that mean there is no way for us to achieve what I outlined?

I'm saying that the operators that do scheduling should do the error handling. For example, I see that observeOn doesn't catch and propagate to onError: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L139

If the operator doesn't do it, a user has no hope of managing errors.

Hi ,My use case is to propagate the error from Observable to subscriber,but the same error is not visible to onError() method of subscriber ,instead it always shows
onError Must subsriber is called --- com.netflix.hystrix.exception.HystrixRuntimeException: CommandSR timed-out and no fallback available.
Below goes my code from where i throw the real exception

 @Override
    protected Observable<String> construct() {
        System.out.println("I am in construct of "+name+"::thread name: "+Thread.currentThread().getName());
        return Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    if (!observer.isUnsubscribed()) {
                        String response = HTTPClientHelper.searchSR("https://stg-step.am.health.ge.com/service/v2/service-event11/search/SHOWROOM24", "gJc1Ts8PWBrCmi5PcTtZJTk6JmyS", "212469169");
                        // a real example would do work like a network call here

                        observer.onNext(response + "!");
                        observer.onCompleted();

                    }
                } catch (Exception e) {
                    System.out.println("Exception+++++++++++++++"+e.getMessage());
                    observer.onError(new RuntimeException("graceful onError"));
                }
            }
         } );

Despite you are using 1.x create, the code signals the exception correctly. I'd assume there is something in Hystrix that suppresses your error and I suggest asking on their page or better yet on Stackoverflow.

ok sure thanks

Sorry for reviving an old discussion again, but can someone help me?

If the user provided Subscriber.onError throws an exception, it will always be treated as fatal (since onError itself failed).

Is this still viable for RxJava 2?

Was this page helpful?
0 / 5 - 0 ratings