Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on rx.Observable, io.reactivex.Observable and Flowable
@Test
public void testCombine() {
rx.observers.TestSubscriber<Integer> testSubscriber = rx.observers.TestSubscriber.create();
rx.Observable<Long> emptyObservable = rx.Observable.empty();
rx.Observable<Object> errorObservable = rx.Observable.error(new Exception());
rx.Observable.combineLatestDelayError(
Arrays.asList(
emptyObservable
.doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
.doOnTerminate(() -> System.out.println("emptyObservable: doFinally")),
errorObservable
.doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
.doOnTerminate(() -> System.out.println("errorObservable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doOnTerminate(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
}
@Test
public void testCombine2() {
TestObserver<Integer> testObserver = TestObserver.create();
Observable<Long> emptyObservable = Observable.empty();
Observable<Object> errorObservable = Observable.error(new Exception());
Observable.combineLatestDelayError(
Arrays.asList(
emptyObservable
.doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
.doFinally(() -> System.out.println("emptyObservable: doFinally")),
errorObservable
.doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
.doFinally(() -> System.out.println("errorObservable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testObserver.awaitTerminalEvent();
}
@Test
public void testCombine2Flowable() {
TestSubscriber<Integer> testObserver = TestSubscriber.create();
Flowable<Integer> emptyFlowable = Flowable.empty();
Flowable<Object> errorFlowable = Flowable.error(new Exception());
Flowable.combineLatestDelayError(
Arrays.asList(
emptyFlowable
.doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
.doFinally(() -> System.out.println("emptyFlowable: doFinally")),
errorFlowable
.doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
.doFinally(() -> System.out.println("errorFlowable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testObserver.awaitTerminalEvent();
}
Output:
testCombine
emptyObservable: [rx.Notification@2b4a2ec7 OnCompleted]
emptyObservable: doFinally
combineLatestDelayError: [rx.Notification@2b4a2ec7 OnCompleted]
combineLatestDelayError: doFinally
testCombine2
emptyObservable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyObservable: doFinally
errorObservable: OnErrorNotification[java.lang.Exception]
errorObservable: doFinally
java.lang.Exception
at com.myproject.Test.testCombine2(Test.java:298)
// not really important Stacktrace
Exception in thread "main" java.lang.Exception
at com.myproject.Test.testCombine2(Test.java:298)
// repeat of not important Stacktrace
testCombine2Flowable
emptyFlowable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyFlowable: doFinally
If error emitter goes first or add some timer instead of empty, then everything is ok.
Also noticed difference in events order between 1.x and 2.x. Is it correct?
Hi.
1) doOnTerminate executes before the terminal event is emitted, also != 藳doFinally which executes after.
2) Flowable.combineLatest doesn't subscribe to the second source if the first terminates without any onNext item
3) Observable.combineLatest is supposed to work like 2) but apparently doesn't, hence the extra error.
Thanks for clarification. Mixed doOnTerminate and doAfterTerminate operators.
Still I find behaviour of Observable.combineLatest wrong. That was main reason of this post.
Will it be fixed?
Yes, I'm working on the fix for 3).
Just was experimenting with combineLatestDelayError
Looks like this case also throws extra exception.
@Test
public void testCombine2Flowable2Errors() throws Exception {
TestSubscriber<Integer> testObserver = TestSubscriber.create();
TestScheduler testScheduler = new TestScheduler();
Flowable<Integer> emptyFlowable = Flowable.timer(10, TimeUnit.MILLISECONDS, testScheduler)
.flatMap(aLong -> Flowable.error(new Exception()));
Flowable<Object> errorFlowable = Flowable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(aLong -> {
throw new Exception();
});
Flowable.combineLatestDelayError(
Arrays.asList(
emptyFlowable
.doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
.doFinally(() -> System.out.println("emptyFlowable: doFinally")),
errorFlowable
.doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
.doFinally(() -> System.out.println("errorFlowable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
testObserver.awaitTerminalEvent();
}
Output
emptyFlowable: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: OnErrorNotification[java.lang.Exception]
combineLatestDelayError: doFinally
emptyFlowable: doFinally
errorFlowable: OnErrorNotification[java.lang.Exception]
errorFlowable: doFinally
java.lang.Exception
at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
// not really important Stacktrace
Exception in thread "main" java.lang.Exception
at com.myproject.Test.testCombine2Flowable2Errors(Test.java:298)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:62)
at io.reactivex.internal.operators.flowable.FlowableTimer$TimerSubscriber.run(FlowableTimer.java:76)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.schedulers.TestScheduler.triggerActions(TestScheduler.java:115)
// repeat of not important Stacktrace
Without timeouts it works as expected
Yes, it looks like the other source is not cancelled in time.
I've posted the fix PR as #4987 that resolves 3) and this latest case.
Closing via #4987
Most helpful comment
Yes, it looks like the other source is not cancelled in time.
I've posted the fix PR as #4987 that resolves 3) and this latest case.