Rxjava: how to retry when subscriber error ?

Created on 20 Sep 2017  路  4Comments  路  Source: ReactiveX/RxJava


          final int[] a = {0};
            Observable.interval(1, TimeUnit.SECONDS).retry(new Predicate<Throwable>() {
                @Override
                public boolean test(Throwable throwable) throws Exception {
                    a[0]=0;
                    System.out.println(" reset Observable");
                    return true;
                }
            }).subscribe(new io.reactivex.functions.Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    a[0]++;
                    System.out.println(a[0]+"====="+System.currentTimeMillis());
                    if (a[0] >5){
                        /*
                        *  it will throw error  i want to retry
                        * */
                        System.out.println("a[2] = " + a[2]);
                    }
                }
            }, new io.reactivex.functions.Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    System.out.println("e.getLocalizedMessage() = " + throwable.getLocalizedMessage());
                    a[0]=0;
                }
            });

this is my log

1=====1505908635697
2=====1505908636687
3=====1505908637684
4=====1505908638684
5=====1505908639684
6=====1505908640684
e.getLocalizedMessage() = 2

If the consumer throws an error, this wording can not be retried? If i want to try again, what should I do?

2.x Question

Most helpful comment

retry can only deal with errors upstream to it. You have to reorganize your flow in a way that the operation that may throw happens before retry:

Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(v -> {
   /* your potentially crashing code here */
})
.retry(e -> true)
.subscribe(v -> { /* non-crashing code here */ }, e -> { /* ... */ }}

All 4 comments

retry can only deal with errors upstream to it. You have to reorganize your flow in a way that the operation that may throw happens before retry:

Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(v -> {
   /* your potentially crashing code here */
})
.retry(e -> true)
.subscribe(v -> { /* non-crashing code here */ }, e -> { /* ... */ }}

thank you very much!

Great!

But is there anyway to retry error on downstreams like subscribe(),cuz if error happens here, it'll be catch by error callback in subscribe() and interrupt the whole steam?

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dimsuz picture dimsuz  路  4Comments

paulblessing picture paulblessing  路  3Comments

perlow picture perlow  路  3Comments

nltran picture nltran  路  4Comments

midnight-wonderer picture midnight-wonderer  路  3Comments