Rxjava: 2.x Observable.combineLatestDelayError delivers Error after completion

Created on 12 Jan 2017  路  6Comments  路  Source: ReactiveX/RxJava

Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on rx.Observable, io.reactivex.Observable and Flowable

    @Test
    public void testCombine() {
        rx.observers.TestSubscriber<Integer> testSubscriber = rx.observers.TestSubscriber.create();

        rx.Observable<Long> emptyObservable = rx.Observable.empty();
        rx.Observable<Object> errorObservable = rx.Observable.error(new Exception());

        rx.Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doOnTerminate(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doOnTerminate(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testSubscriber);

        testSubscriber.awaitTerminalEvent();
    }

    @Test
    public void testCombine2() {
        TestObserver<Integer> testObserver = TestObserver.create();

        Observable<Long> emptyObservable = Observable.empty();
        Observable<Object> errorObservable = Observable.error(new Exception());

        Observable.combineLatestDelayError(
                Arrays.asList(
                        emptyObservable
                                .doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyObservable: doFinally")),
                        errorObservable
                                .doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorObservable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

    @Test
    public void testCombine2Flowable() {
        TestSubscriber<Integer> testObserver = TestSubscriber.create();

        Flowable<Integer> emptyFlowable = Flowable.empty();
        Flowable<Object> errorFlowable = Flowable.error(new Exception());

        Flowable.combineLatestDelayError(
                Arrays.asList(
                        emptyFlowable
                                .doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyFlowable: doFinally")),
                        errorFlowable
                                .doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorFlowable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testObserver.awaitTerminalEvent();
    }

Output:
testCombine

emptyObservable: [rx.Notification@2b4a2ec7 OnCompleted]
emptyObservable: doFinally
combineLatestDelayError: [rx.Notification@2b4a2ec7 OnCompleted]
combineLatestDelayError: doFinally

testCombine2

emptyObservable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyObservable: doFinally
errorObservable: OnErrorNotification[java.lang.Exception]
errorObservable: doFinally
java.lang.Exception
    at com.myproject.Test.testCombine2(Test.java:298)
    // not really important Stacktrace
Exception in thread "main" java.lang.Exception
    at com.myproject.Test.testCombine2(Test.java:298)
    // repeat of not important Stacktrace

testCombine2Flowable

emptyFlowable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyFlowable: doFinally

If error emitter goes first or add some timer instead of empty, then everything is ok.

Also noticed difference in events order between 1.x and 2.x. Is it correct?

2.x Bug

Most helpful comment

Yes, it looks like the other source is not cancelled in time.

I've posted the fix PR as #4987 that resolves 3) and this latest case.

All 6 comments

Hi.

1) doOnTerminate executes before the terminal event is emitted, also != 藳doFinally which executes after.
2) Flowable.combineLatest doesn't subscribe to the second source if the first terminates without any onNext item
3) Observable.combineLatest is supposed to work like 2) but apparently doesn't, hence the extra error.

Thanks for clarification. Mixed doOnTerminate and doAfterTerminate operators.

Still I find behaviour of Observable.combineLatest wrong. That was main reason of this post.
Will it be fixed?

Yes, I'm working on the fix for 3).

Just was experimenting with combineLatestDelayError
Looks like this case also throws extra exception.

 @Test
    public void testCombine2Flowable2Errors() throws Exception {
        TestSubscriber<Integer> testObserver = TestSubscriber.create();

        TestScheduler testScheduler = new TestScheduler();

        Flowable<Integer> emptyFlowable = Flowable.timer(10, TimeUnit.MILLISECONDS, testScheduler)
                .flatMap(aLong -> Flowable.error(new Exception()));
        Flowable<Object> errorFlowable = Flowable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(aLong -> {
            throw new Exception();
        });

        Flowable.combineLatestDelayError(
                Arrays.asList(
                        emptyFlowable
                                .doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("emptyFlowable: doFinally")),
                        errorFlowable
                                .doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
                                .doFinally(() -> System.out.println("errorFlowable: doFinally"))),
                objects -> 0
        )
                .doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
                .doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
                .subscribe(testObserver);

        testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);

        testObserver.awaitTerminalEvent();
    }

Output

emptyFlowable: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: doFinally
emptyFlowable: doFinally
errorFlowable: OnErrorNotification[java.lang.Exception]
errorFlowable: doFinally
java.lang.Exception
    at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
    at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
    at io.reactivex.Scheduler$1.run(Scheduler.java:134)
    at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
    // not really important Stacktrace
Exception in thread "main" java.lang.Exception
    at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
    at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
    at io.reactivex.Scheduler$1.run(Scheduler.java:134)
    at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
    // repeat of not important Stacktrace

Without timeouts it works as expected

Yes, it looks like the other source is not cancelled in time.

I've posted the fix PR as #4987 that resolves 3) and this latest case.

Closing via #4987

Was this page helpful?
0 / 5 - 0 ratings

Related issues

yubaokang picture yubaokang  路  3Comments

dzharikhin picture dzharikhin  路  4Comments

archenroot picture archenroot  路  3Comments

ZakTaccardi picture ZakTaccardi  路  3Comments

dimsuz picture dimsuz  路  4Comments