Rxjs: If a hot observable's subscriber ignores errors, subsequent subscribers also ignore errors

Created on 20 Nov 2016  路  19Comments  路  Source: ReactiveX/rxjs

If a subscriber subscribes to a hot observable with a next handler only (no error handler), then any subscribers that try to subscribe afterwards don't have their next handler called.

RxJS version:
[email protected]

Code to reproduce:
https://plnkr.co/edit/0CgdiUZ7iyIzzBKpoBTL?p=preview

var observable = Rx.Observable.throw('some error').share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

// This subscriber cares about errors, but never gets them
// because the second subscriber did not handle them
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

Expected behavior:
The first and third subscriber's error handler should be called, and this output should be logged:

first subscriber, error some error
third subscriber, error some error

Actual behavior:
Only the first subscriber's error handler is called, the third subscriber's error handler is not called. The below is logged instead:

first subscriber, error some error
  Uncaught some error (Rx.js:831)
    Observable.subscribe @ Rx.js:831
    (anonymous function) @ script.js:11

Additional information:
Possibly related: #1420

discussion

Most helpful comment

Apologies I'm late to this party.

This is an issue called "Producer interference". It's been dealt with in the TC39 proposal, and it's also actually dealt with in master for RxJS 6. Unfortunately, it's not something we can fix in RxJS 5.x because it's a breaking change (albeit a small one)

Basically to fix this we have to break the following:

try {
  of(1).map(() => { throw new Error('lol') }).subscribe();
} catch (err) {
  console.log('this code should definitely be hit in RxJS 5.5, but won't in Rx 6');
}

The problem exists, as I'm sure some of you have figured out, because unhandled errors are immediately rethrown in RxJS. This means that they'll unwind the stack back to a loop that is notifying for a multicast, and break the loop.

The solution is quite simple, and makes sense for consistencies sake: Just schedule the throw on a timeout.

The truth is that putting a try/catch around a subscription is silly, and accommodating it doesn't make any sense. So we've moved away from that potentially buggy behavior, and it actually cleaned up a lot of code for us.

FYI: The workaround for this is to use observeOn(asap) after any multicast if you're worried about this behavior happening to you.

All 19 comments

Edit: this particular comment isn't applicable, skip to https://github.com/ReactiveX/rxjs/issues/2145#issuecomment-349486823


Actually, I'm not sure this is a bug. What actually seems to be happening is that, if you don't provide an error handler, the error bubbles and the last subscriber _doesn't even run_

var observable = Rx.Observable.throw('some error').share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

alert('!!! THIS CODE IS NEVER REACHED !!!');

// This subscriber cares about errors, but never gets them
// because it never actually runs!
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

This seems as-designed. If you don't catch the error, they propagate as normal exceptions would.

This example shows the bug better:

var observable = Rx.Observable.interval(200).do(function(i) {
  if (i === 2) {
    throw 'some error';
  }
}).share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

// This subscriber cares about errors, but never gets them
// because the second subscriber did not handle them
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

Output:

first subscriber, next 0
second subscriber, next 0
third subscriber, next 0
first subscriber, next 1
second subscriber, next 1
third subscriber, next 1
first subscriber, error some error
  Uncaught some error (Rx.js:3753)

Note that there is no third subscriber, error some error line.

@imgx64 Subscriber's default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber's next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn't stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can't really be helped.

@trxcllnt

Subscriber's default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

I understand about synchronous errors, and I've posted a second example that shows it applies for asynchronous errors.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber's next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

The reason I mentioned that bug is this comment, which says that an unhandled error in one subscription terminates all other subscriptions to the same observable. This is what I'm seeing here which I think is unexpected behavior.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn't stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can't really be helped.

But this is interrupting the execution for subscribers with an error handler if a subscriber without an error handler happened to subscribe to the same observable.

I tried the same thing with RxJava and got what I expected:

Code:

Observable<Long> observable = Observable.interval(200, TimeUnit.MILLISECONDS).doOnNext((i) -> {
    if (i == 2) {
        throw new RuntimeException("some error");
    }
}).share();

observable.subscribe((res) -> {
    System.out.println("first subscriber, next " + res);
}, (err) -> {
    System.out.println("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe((res) -> {
    System.out.println("second subscriber, next " + res);
});

// This subscriber cares about errors, and gets them, as expected
observable.subscribe((res) -> {
    System.out.println("third subscriber, next " + res);
}, (err) -> {
    System.out.println("third subscriber, error " + err);
});

Output:

first subscriber, next 0
second subscriber, next 0
third subscriber, next 0
first subscriber, next 1
second subscriber, next 1
third subscriber, next 1
first subscriber, error java.lang.RuntimeException: some error
java.lang.RuntimeException: some error
    at com.example.Demo.lambda$0(Demo.java:53)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
    at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxComputationThreadPool-1" java.lang.RuntimeException: some error
    at com.example.Demo.lambda$0(Demo.java:53)
    at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
    at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
third subscriber, error java.lang.RuntimeException: some error

Hey, we discussed this today at our meeting, notes here. We're going to keep the existing behavior for now (v4 had the same behavior), but TC39 is considering this exact problem in the context of DOM event streams, which are multicast as well. So we're going to wait and see what direction they want to go in.

The TC39 discussion on this is here: https://github.com/tc39/proposal-observable/issues/119

Worth noting - @blesh added a good blog post that discusses this issue

Has a final consensus been reached on this issue? If so what is it?

Thanks

@ccrowhurstram nope. any updates would be posted here.

The tc39 thread (link above) the discussion was left at:

Errors thrown from observer notifications will be caught and reported to HostReportErrors, and will not close the subscription. This matches the behavior of Event Target and Iterable.iterate.

If the developer desires that the subscription be closed when there is an error in consumer code they can use forEach.

But this isn't the final word on it yet. It's too early in the spec process to have anything solidified and we don't want to change things in RxJS prematurely just to have to change them back a few months later.

OK thanks for the information

BTW, slightly off topic here but worth the cross reference...

The work around to the problem that uses observeOn(Scheduler.asap) results in another problem in an angular 2 app:

The error thrown by an operator like map will not being sent to the usual angular 2 ErrorHandler service.

I've reported the issue with angular here: https://github.com/angular/angular/issues/14316

For working around this issue with errors, if I wanted to "percolate" errors to all the subscribers of a subject, would I be best to create a "safe" subject as @benlesh's article pointed out was the issue? The sample workaround provided to use the async scheduler is not useful int his context as the flows in use require "synchronized" execution on a "clock" (a la VirtualTime like the TestScheduler).

I apologize if I over simplified anything, I'm not overly familiar with rxjs at this point.

I had to write an interim function which checks if a subscriber has error callback missing, and adding an empty one in that case. Quite an ugly approach.
When I subscribe to an observable, I expect either subscribe or error handler to work. Subscriber can't be responsible for previous subscribers for not having error handler defined.
Hope discussion goes into a way of fixing error propagation and let it flow through all the subscribers, which (for me) seems the only reasonable solution.

@georgique the behavior in v5 is as intended, so this ticket could prolly be closed. In v6 errors from subscribers will not be rethrown, but they will still be reported to the console/window.onerror for debugging. That's very similar to the behavior of Promises. #3062

@jayphelps I might misunderstand things a bit, but initially issue was not about errors in subscribers, wasn't it? I have pretty simple scenario - an service sending login request to a server with wrong credentials. When a response is received, I have an Rx Subject broadcasting results to subscribers, so different system parts can react on login attempt. In my particular case I have three subscribers, two of them do not handle errors, it's just not needed there. The third though has to handle error, because it is the one showing error to the end user. But error never gets to the third subscriber just because first two don't have error handlers. If that's intended behavior, then maybe at least make error handlers compulsory?
Rxjs version is 5.5.3

@georgique That's okay. Indeed it was about errors in subscribers. In v5 if you don't provide an error callback a default one is provided for you which just rethrows the error. In this context I mean "subscriber" to be the person who is subscribing with an observer that does not include an error callback. Here's a demo https://jsbin.com/ruhosup/edit?js,console,output

If that's intended behavior, then maybe at least make error handlers compulsory?
Rxjs version is 5.5.3

It's intended behavior for v1-v5, but won't be in v6. It won't be possible to change this or make error handlers required in v5 because it's a breaking change.

If you want to mitigate it you could schedule the subscription to happen asynconronously using the .subscribeOn() operator. The AsapScheduler schedules on a microtask (e.g. like Promises) and the AsyncScheduler schedules on a macrotask (e.g. like setTimeout(0)) learn more about micro/macro tasks

I would use the AsapScheduler in this case IMO as the microtask will happen relatively faster.

https://jsbin.com/lifehu/edit?js,console,output

import { asap } from 'rxjs/schedulers/asap';

// when anyone subscribes it won't happen synchronously, instead
// each will be scheduled on their own microtask
const items = (new Subject()).subscribeOn(asap);

items.subscribe({
  next: d => console.log('first ' + d),
  error: e => console.error('FIRST SUBSCRIBER SAW IT')
});

items.subscribe({
  next: d => console.log('second ' + d),
  // no error handler so one is provided for you that
  // will rethrow the error synchronously but the fact that we scheduled
  // using the AsapScheduler means that rethrown error will not affect anyone else
  // because it happens with its own fresh callstack.
});

items.subscribe({
  next: d => console.log('third ' + d),
  error: e => console.error('THIS IS ALSO CALLED, CORRECTLY')
});

items.next(1);
items.next(2);
items.error('bad stuff');

Keep in mind that this has potential negatives too, since your subscription doesn't happen synchronously it becomes harder to reason so could introduce bugs. Technically it doesn't perform as well relatively speaking but the difference is negligible in most cases since you're not typically subscribing thousands of times per second

Now clear. Thanks @jayphelps.

Apologies I'm late to this party.

This is an issue called "Producer interference". It's been dealt with in the TC39 proposal, and it's also actually dealt with in master for RxJS 6. Unfortunately, it's not something we can fix in RxJS 5.x because it's a breaking change (albeit a small one)

Basically to fix this we have to break the following:

try {
  of(1).map(() => { throw new Error('lol') }).subscribe();
} catch (err) {
  console.log('this code should definitely be hit in RxJS 5.5, but won't in Rx 6');
}

The problem exists, as I'm sure some of you have figured out, because unhandled errors are immediately rethrown in RxJS. This means that they'll unwind the stack back to a loop that is notifying for a multicast, and break the loop.

The solution is quite simple, and makes sense for consistencies sake: Just schedule the throw on a timeout.

The truth is that putting a try/catch around a subscription is silly, and accommodating it doesn't make any sense. So we've moved away from that potentially buggy behavior, and it actually cleaned up a lot of code for us.

FYI: The workaround for this is to use observeOn(asap) after any multicast if you're worried about this behavior happening to you.

This was resolved with the release of v6 almost a year ago. Closing.

Was this page helpful?
0 / 5 - 0 ratings