Rxjava: mergeDelayError not delaying errors when subscribing to schedulers

Created on 25 Aug 2015  Â·  18Comments  Â·  Source: ReactiveX/RxJava

When two observables are merged with mergeDelayError and the resulting observable is subscribed to a scheduler, then errors are not delayed. Example:

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .subscribeOn(Schedulers.io()).subscribe(observer);

In this case "hello" will never be emitted. Also mentioned in this SO question:
http://stackoverflow.com/questions/32131594/rx-java-mergedelayerror-not-working-as-expected

Question

Most helpful comment

I'm still facing this issue using v.1.0.14.

Test that reproduce the case:

@Test public void testMergeDelayErrorWithOnErrorBeforeOnNext() {
    TestSubscriber<String> ts = new TestSubscriber<>();

    final Observable<String> errorObservable = Observable.error(new RuntimeException());

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override public void call(Subscriber<? super String> subscriber) {
        try {
          //Simulate long operation
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onCompleted();
      }
    });

    Observable.mergeDelayError(errorObservable, observable)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .subscribe(ts);

    ts.awaitTerminalEvent();
    ts.assertError(RuntimeException.class);
    ts.assertReceivedOnNext(Arrays.asList("1", "2"));
  }

Test output:

java.lang.AssertionError: Number of items does not match. Provided: 2  Actual: 1
    at rx.observers.TestObserver.assertReceivedOnNext(TestObserver.java:116)
    at rx.observers.TestSubscriber.assertReceivedOnNext(TestSubscriber.java:229)
    attestMergeDelayErrorWithOnErrorBeforeOnNext

All 18 comments

Hello. If you wait for the terminal event then it works as expected:

@Test
public void errorDelayed() {
    TestSubscriber<Object> ts = TestSubscriber.create();
    Observable.mergeDelayError(
            Observable.error(new RuntimeException()),
            Observable.just("Hello")
        )
        .subscribeOn(Schedulers.io()).subscribe(ts);

    ts.awaitTerminalEvent();

    ts.assertError(RuntimeException.class);
    ts.assertValue("Hello");
}

In your example you're using a TestSubscriber, a Subscriber does not have this method. Is there a way to make this work correctly for production code?

You can use toBlocking() to gain access to blocking operations such as forEach() that will wait on the current thread. But why do you want to block the main thread?

What I want is mergeDelayError to delay errors when Im subscribing to a scheduler, I don't want it to block the main thread. The problem is that when I subscribe it to a scheduler it does not delay the error emissions and I don't get my onNext events, the emission stops on the first error. From the documentation I've read on mergeDelayError it seems like a bug.

Could you post an unit test that demonstrates this problem? Do you run RxJava 1.0.14?

Updated from RxJava 1.0.3 to 1.0.14, the issue is gone. Thanks for your time =)

Great!

I'm still facing this issue using v.1.0.14.

Test that reproduce the case:

@Test public void testMergeDelayErrorWithOnErrorBeforeOnNext() {
    TestSubscriber<String> ts = new TestSubscriber<>();

    final Observable<String> errorObservable = Observable.error(new RuntimeException());

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override public void call(Subscriber<? super String> subscriber) {
        try {
          //Simulate long operation
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onCompleted();
      }
    });

    Observable.mergeDelayError(errorObservable, observable)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .subscribe(ts);

    ts.awaitTerminalEvent();
    ts.assertError(RuntimeException.class);
    ts.assertReceivedOnNext(Arrays.asList("1", "2"));
  }

Test output:

java.lang.AssertionError: Number of items does not match. Provided: 2  Actual: 1
    at rx.observers.TestObserver.assertReceivedOnNext(TestObserver.java:116)
    at rx.observers.TestSubscriber.assertReceivedOnNext(TestSubscriber.java:229)
    attestMergeDelayErrorWithOnErrorBeforeOnNext

actually i think it's a regression bug using 1.0.12 the test pass

I think what you see is that the error cuts ahead in observeOn. Try applying doOnNextand doOnError after the mergeDelayError and see their order printed out.

@akarnokd but the point it's that using v1.0.12 of Rx the test pass and using v.1.0.14 fails

No, 1.0.14 exposes the bad assumption. Try this:

@Test 
public void testMergeDelayErrorWithOnErrorBeforeOnNext() {
    TestSubscriber<String> ts = new TestSubscriber<>();

    final Observable<String> errorObservable = Observable.error(new RuntimeException());

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override public void call(Subscriber<? super String> subscriber) {
        try {
          //Simulate long operation
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onCompleted();
      }
    });

    Observable.mergeDelayError(errorObservable, observable)
        .materialize()
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .<String>dematerialize()
        .subscribe(ts);

    ts.awaitTerminalEvent();
    ts.assertError(RuntimeException.class);
    ts.assertReceivedOnNext(Arrays.asList("1", "2"));
}

By materializing before the observeOn, the exception is hidden and thus can't cut ahead.

Besides, what you are experiencing is a non-deterministic test. If I run the original, it passes. If I put it into a loop, it fails after 3-4 iterations.

I'm on 1.2.0 already and mergeDelayError still behaves funny.
Sometimes I do get error delayed, sometimes it's not delayed.
If I print via doOnNext and doOnError – I see that 1 onNext came in with data, and then error comes, but this onNext never got to my subscriber for some reason.

And I wouldn't use TestSubscribe to test this situation. TestSubscriber has no notion of schedulers and make operations blocking, so if the problem related to threading it will not be revealed by the test

@AAverin Likely you have to specify observeOn(scheduler, true) to delay error over the observeOn as well; delaying with merge is not enough in this case.

@akarnokd wow, I had no idea observeOn had this argument. I didn't even think observing thread has any tell on wether to delay error or not. Why should it care about this?

Also, how would this flag on observeOn affect my logic if there is not mergeDelayError up in the chain of rx calls?

Please provide an unit test that demonstrates your problem, otherwise I have to revert to guesswork.

@akarnokd observeOn(scheduler, true) seem to have fixed the problem, thanks.
Although I'm still confused observeOn cares about this matter, for me it should not modify the order of events in any way and if chain of rx calls delays the error so should the observeOn. So basically this flag shouldn't be present on observeOn method in the first place

The original ReactiveX design favored fail-fast streams. An async boundary such as observeOn has the opportunity to see both the elements of the source thread and the exception that came after in that thread. Since observeOn with the fail-fast behavior was very established when this property started causing confusion, we had no choice but to overload and parametrize the error handling behavior. I'd say you have it better because Rx.NET's suggested solution was to materialize and dematerialize the stream to overcome this property...

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nltran picture nltran  Â·  4Comments

gfx picture gfx  Â·  3Comments

paulblessing picture paulblessing  Â·  3Comments

Jaap-van-Hengstum picture Jaap-van-Hengstum  Â·  3Comments

midnight-wonderer picture midnight-wonderer  Â·  3Comments