Rxjava: zip: doOnTerminate is not called on some observables

Created on 30 Jul 2015  路  14Comments  路  Source: ReactiveX/RxJava

    @Test
    public void test() {
        Observable.zip(
                Observable.just("1")
                        .doOnTerminate(() -> System.out.println("TERMINATE 1")),
                Observable.just("2")
                        .delay(1, TimeUnit.SECONDS)
                        .doOnTerminate(() -> System.out.println("TERMINATE 2")),
                (result1, result2) -> null)
                .doOnTerminate(() -> System.out.println("TERMINATE"))
                .toBlocking()
                .single();
    }

Output:

TERMINATE 1
TERMINATE

Expected output:

TERMINATE 1
TERMINATE 2
TERMINATE

Or maybe my thinking is wrong?

Context of the problem: I use Hystrix observable commands in zip:

Observable.zip(
  new MyHystrixObservableCommand(arg1).toObservable(),
  new MyHystrixObservableCommand(arg2).toObservable(),
  (result1, result2) -> null)
.toBlocking()
.single()

and hystrix command semaphore release happens in doOnTerminate. One of them is not called and semaphore is not released.

Question

All 14 comments

Once the second delayed value runs the zip, it detects that the first source has terminated, completes and unsubscribes the second source. At this time, there is an onCompleted scheduled by the second source which due to the unsubscription won't be executed and you don't see the doOnTerminate called.

I felt back to merge:

        Observable.merge(
                Observable.just("1")
                        .doOnTerminate(() -> System.out.println("TERMINATE 1")),
                Observable.just("2")
                        .delay(1, TimeUnit.SECONDS)
                        .doOnTerminate(() -> System.out.println("TERMINATE 2")))
                .doOnTerminate(() ->
                        System.out.println("TERMINATE"))
                .ignoreElements()
                .singleOrDefault(null)
                .toBlocking()
                .single();
TERMINATE 1
TERMINATE 2
TERMINATE

This code looks strange to me. Please, advice how to properly run some observables in parallel and wait for them to finish. I can go with merge for now because I do not need observable results.

You can use lastOrDefault to shorten your codes, e.g., o1.mergeWith(o2).toBlocking().lastOrDefault(null).

@vleushin Is your issue resolved ? I am facing same issue . What is the drawback of using merge ?

I use merge. Drawback of merge is that it's hard to combine results, if you need them. If you don't need them (like in my case), you can be good with merge

I just encountered this same thing. I want to zip together two Observables and do some simple logging on each when complete. You don't even need the delay to make this happen.

        Observable<Integer> nums = Observable.just(1, 2, 3, 4)
                .doOnCompleted(() -> System.out.println("done with nums"));
        Observable<String> letters = Observable.just("a", "b", "c", "d")
                .doOnCompleted(() -> System.out.println("done with letters")); // this won't appear

        nums.zipWith(letters, (n, l) -> "got " + n + " and " + l)
                .toBlocking()
                .forEach(System.out::println);

I found this behavior to be very surprising. Could zip be changed to allow the source Observables to fully complete in the case where they are the same length?

I'd add doOnUnsubscribe so you can execute the cleanup action or simply use using that will execute the cleanup if the source completes normally or is cancelled.

Why is this closed? I think the current zip behavior is incorrect.
My point is, if inner observables of zip emit the same number of items, then onCompleted (doOnCompleted, doOnTerminate etc.) should be called on each of them.

DoOnTerminate won't be called if the observable is unsubscribed before the onCompleted/onError. Use doOnUnsubscribe if you need to always get a call back.

That is what I'm talking about.
Why unsubscribe just before completion? An event is lost without a reason.
Also there is an obvious inconsistency: you have two identical inner observables, the first completes, then unsubscribes, the second one unsubscribes before completion, even though it was about to do so.

With collections over time, you can't know you are just before completion. The zip operator behaves correctly and you need a different operator, doOnTerminate+doOnUnsubscribe or using, to handle completion and unsubscription case together.

It is clear to me now. Thank you all for clarifications. I think we can close this issue.

@abersnaze doOnUnsubscribe can be an acceptable workaround, but what if I don't want it to trigger on error?

@akarnokd Formally, zip behaves correctly, since it is not documented whether onCompletes (and so on) should be called on inner observables.
The fact is the behavior is unspecified and unpredictable. The problem is, I want to know exactly what does my code do.

See the proposed documentation changes in #3981.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nltran picture nltran  路  4Comments

perlow picture perlow  路  3Comments

dimsuz picture dimsuz  路  4Comments

hoc081098 picture hoc081098  路  3Comments

Jaap-van-Hengstum picture Jaap-van-Hengstum  路  3Comments