Rxjava: Subscriber's onCompleted is not called when return Observable.empty() in retryWhen()

Created on 25 Nov 2015  Â·  11Comments  Â·  Source: ReactiveX/RxJava

I was testing out RxJava's retryWhen() function an this problem occurred.

retryWhen()'s docs says:

Returns an Observable that emits the same values as the source observable with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the Observable provided as an argument to the notificationHandler function. If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription. Otherwise, this Observable will resubscribe to the source Observable.

When I returns Observable.empty() in flatMap(), the subscriber's onCompleted never gets called despite Observable.empty()'s docs says:

Returns an Observable that emits no items to the Observer and immediately invokes its onCompleted method.

If I replace Observable.empty() with Observable.error(new Exception("New Exception!")), the new exception is thrown to the subscriber's onError(). It's correct according to the retryWhen()'s docs.

Following is my code:

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(10);
                sleep(1000);

                subscriber.onNext(20);
                sleep(1000);

                subscriber.onError(new Exception("My Exception!"));
            }
        }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        log("throwable = " + throwable.getMessage());
                        return Observable.empty();
//                        return Observable.error(new Exception("New Exception!"));
                    }
                });
            }
        }).subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("onError: " + e.getMessage());
            }

            @Override
            public void onNext(Object integer) {
                log("onNext: " + integer);
            }
        });
    private void sleep(long duration) {
        try {
            Thread.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void log(String s) {
        Log.e("linhln", s);
    }
Question

All 11 comments

From a cursory debug walkthrough it looks like the merge is swallowing the onComplete in the inner subscriber somehow. I'll look at how the OperatorRedo subscribes to the controlHandlerFunction. In the mean time you could use something like takeWhile which will call onCompleted without having to merge.

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(10);
        sleep(1000);

        subscriber.onNext(20);
        sleep(1000);

        subscriber.onError(new Exception("My Exception!"));
    }
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> call(Observable<? extends Throwable> notifications) {
        return notifications
            .takeWhile(new Func1<Throwable, Boolean>(){
                @Override
                public Boolean call(Throwable throwable){
                    log("throwable = " + throwable);
                    return throwable instanceof IllegalStateException;
                }});
    }})
.subscribeOn(Schedulers.newThread())
.toBlocking()
.forEach(new Action1<Integer>() {
    @Override
    public void call(Integer x) {
        log("onNext: " + x);
    }
});
log("complete");

Console:

onNext: 10
onNext: 20
throwable = java.lang.Exception: My Exception!
complete

Actually I'm not trying to achieve anything but testing out the retryWhen() operator.
Anyway, thank you for the code and confirmation that this is a bug.

Hi. What happens is that the Observable provided for your callback never calls onCompleted so flatMap keeps on going despite it returning something that completes immediately. This is the expected behavior. Therefore, you need a set of operations that end in onCompleted naturally, such as the example of @stealthcode . Maybe a shorter trick would be:

.retryWhen(e -> e.take(1).ignoreElements())

Oh I see. Because the observable passed in as the argument could still emit a new Throwable (that may result in a flatMap of more emissions). Yes this makes more sense. You would have to complete the outer observable in order to ensure that no further inner observables can be merged.

@akarnokd @abersnaze If we begin to create observables that have limited operator sets then it would be good to pass in an observable that did not have flatMap to the notification handler (in 2.x of course).

Hi @akarnokd, I'm quite new to Reactive so I don't get your trick.

So to make it simpler, I have this piece of code.
The problem is onCompleted is not called despite Observable.empty(), could you please show me how to apply the trick to make onCompleted gets called.

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(10);
                sleep(1000);

                subscriber.onNext(20);
                sleep(1000);

                subscriber.onError(new Exception("My Exception!"));
            }
        })
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            int count = 0;
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++count < 3) {
                            log(String.format("Because of %s: retrying after %d second(s)...", throwable.getMessage(), count));
                            return Observable.timer(count, TimeUnit.SECONDS);
                        }

                        // just complete
                        return Observable.empty();
                    }
                });
            }
        })
        .subscribeOn(Schedulers.newThread()).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("onError: " + e.getMessage());
            }

            @Override
            public void onNext(Object integer) {
                log("onNext: " + integer);
            }
        });

If you have this counted retry, my suggestion won't work and you need to use @stealthcode's suggestion

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
            int count = 0;
            @Override
            public Observable<?> call(Observable<? extends Throwable> observable) {
                return observable
                .takeWhile(e -> count++ < 3)
                .flatMap(e -> Observable.timer(count, TimeUnit.SECONDS));
                });
            }
        })

@akarnokd Thank you, it works, I got the idea as well.
But I guess it should be .takeWhile(e -> count++ < 3) instead of .takeWhile(e -> count < 3)

Right. Updated the code example.

You can also use scan to introduce a stateful count then take while (if you
favor composition over closing over).

On Thu, Nov 26, 2015, 06:30 David Karnok [email protected] wrote:

Right. Updated the code example.

—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/issues/3540#issuecomment-159927688.

Can i end it with onError() instead of onComplete() ?

Ok finally made it:

@Override
    public Observable<?> call(Observable<? extends Throwable> errors) {
        return errors
                .flatMap(error -> {
                    if (error instanceof IOException) {         // Network Error
                        int delayFinal = delay * retryCount;
                        retryCount++;
                        if (retryCount <= numberOfTry+1) {
                            LogController.debug("test", "retryCount " + delayFinal, null);
                            return Observable.timer((long) delayFinal, timeUnit);
                        } else {
                            return Observable.error(error);
                        }
                    }else{
                        return Observable.error(error);
                    }

                });

    }
Was this page helpful?
0 / 5 - 0 ratings

Related issues

francorolando picture francorolando  Â·  3Comments

archenroot picture archenroot  Â·  3Comments

yubaokang picture yubaokang  Â·  3Comments

theblang picture theblang  Â·  3Comments

Jaap-van-Hengstum picture Jaap-van-Hengstum  Â·  3Comments