Rxjava: Observable#repeatWhen

Created on 18 Apr 2015  路  10Comments  路  Source: ReactiveX/RxJava

I'm trying to leverage the repeatWhen operator, but its behavior doesn't seem to be following the documentation.
I'm probably not using it correctly, but I can't see exactly what I'm doing wrong, and I also noticed that this operator is lacking test coverage.

final AtomicInteger i = new AtomicInteger(0);
final Observable<?> timer = Observable.timer(100, TimeUnit.MILLISECONDS)
        .take(6)
        .cache();

final Observable<Integer> result = Observable.defer(() -> Observable.just(i.getAndIncrement()))
        .repeatWhen(observable -> timer)
        .cache();

result
        .subscribe(System.out::println);

// wait for result to complete.
result
        .toList()
        .toBlocking()
        .first();

I would expect this to print 0, 1, 2, 3, 4, 5 and then complete, but instead result is only emitting 0 and completing.

Note that what I'm trying to accomplish is more complex than this (this example in particular could be implemented with timer() + map()).
In my example the observable returned from repeatWhen is a subject to which I send values to make the resulting Observable repeat itself, but I simplified this for illustration purposes.

Could somebody point to what I'm doing wrong, or whether there's a better way to implement what I described?

Thank you.

Question

Most helpful comment

@akarnokd The case you point out seems like a bug to me. The final subscriber after the repeatWhen should always receive the onCompleted. Emitting an empty observable from the notificationHandler in a repeatWhen will propagate the onCompleted event to the child subscriber effectively terminating the observable chain. Its interesting that when I change the flatMap to a takeWhile the subscriber does execute it's onCompleted.

AtomicInteger c = new AtomicInteger();
Observable.just(1)
    .repeatWhen(o -> o.takeWhile((v -> c.getAndIncrement() == 0)))
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

output

1
1
Done

All 10 comments

Your first observable does not emmit onError notification so basically repeatWhen will never kick in ;) (it will resubscribe only onError notification)

Isn't that only relevant for retryWhen?

In any case, I reaized that a much simpler solution for my problem is using flatMap, though I'd still like to understand the semantics of this operator.

Thanks for your reply!

This appears to be a bug with repeatWhen (I can't find any unit test for this operator).

Edit: quick clarification: Observable.timer(100, TimeUnit.MILLISECONDS) fires only once so you should see only 1 repetition.

I've looked into repeatWhen and it is unclear to me how the returned observable from the user function should affect the resubscription. It seems you can't dismiss the incoming observable and return something independent because the returned observable timer will be subscribed to immediately. My guess is that you need to map/flatMap over the observable to return a possible delay for the resubscription:

AtomicInteger c = new AtomicInteger();
Observable.just(1)
.repeatWhen(o -> o.flatMap(v -> {
    if (c.getAndIncrement() == 0) {
        return Observable.just(1);
    }
    return Observable.empty();
}))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

But this never prints "Done".

/cc @benjchristensen @stealthcode

@akarnokd, for me your example is working as expected.

If you flatMap the observable o given by repeatWhen, it will emit all observables inside it but will only complete when o completes. If you keep feeding it with empty observers it won't complete. But it won't repeat, because it only repeats if you emit something.

This will print "Done", but only one "1", of course:

AtomicInteger c = new AtomicInteger();
Observable.just(1)
.repeatWhen(o -> o.flatMap(v -> {
    if (c.getAndIncrement() == 0) {
        return Observable.just(1);
    }
    return Observable.empty();
}).take(1))
.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

Unless I'm missing something, that's the intended behavior, right?

@NachoSoto Your usage of repeatWhen should map the given observable and return it. The reason you are not seeing any repeats is because you are throwing away the feedback loop and we are unable to subscribe to it. Instead we are subscribing to your timer observable. Here is a functioning example with the desired delay.

        CountDownLatch l = new CountDownLatch(50);
        Observable.defer(() -> Observable.just(i.getAndIncrement()))
                .repeatWhen(observable -> observable.delay(100, TimeUnit.MILLISECONDS))
                .subscribe((t) -> {
                    System.out.println(t);
                   l.countDown(); 
                });
        l.await();

Oh and one more thing. You can get the desired effect of final Observable<?> timer = Observable.timer(100, TimeUnit.MILLISECONDS).take(6) with the following.

CountDownLatch l = new CountDownLatch(50);
Observable.defer(() -> Observable.just(i.getAndIncrement()))
        .repeatWhen(observable -> observable.delay(100, TimeUnit.MILLISECONDS).take(6))
        .subscribe((t) -> {
            System.out.println(t);
            l.countDown(); 
        });
l.await();

output

0
1
2
3
4
5
6

@akarnokd The case you point out seems like a bug to me. The final subscriber after the repeatWhen should always receive the onCompleted. Emitting an empty observable from the notificationHandler in a repeatWhen will propagate the onCompleted event to the child subscriber effectively terminating the observable chain. Its interesting that when I change the flatMap to a takeWhile the subscriber does execute it's onCompleted.

AtomicInteger c = new AtomicInteger();
Observable.just(1)
    .repeatWhen(o -> o.takeWhile((v -> c.getAndIncrement() == 0)))
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));

output

1
1
Done

/cc @davidmoten is there a unit test covering this use case in #2997?

I now understand the intent behind this operator and @stealthcode 's example with delay works as expected.

Was this page helpful?
0 / 5 - 0 ratings