final int[] a = {0};
Observable.interval(1, TimeUnit.SECONDS).retry(new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
a[0]=0;
System.out.println(" reset Observable");
return true;
}
}).subscribe(new io.reactivex.functions.Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
a[0]++;
System.out.println(a[0]+"====="+System.currentTimeMillis());
if (a[0] >5){
/*
* it will throw error i want to retry
* */
System.out.println("a[2] = " + a[2]);
}
}
}, new io.reactivex.functions.Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("e.getLocalizedMessage() = " + throwable.getLocalizedMessage());
a[0]=0;
}
});
this is my log
1=====1505908635697
2=====1505908636687
3=====1505908637684
4=====1505908638684
5=====1505908639684
6=====1505908640684
e.getLocalizedMessage() = 2
If the consumer throws an error, this wording can not be retried? If i want to try again, what should I do?
retry can only deal with errors upstream to it. You have to reorganize your flow in a way that the operation that may throw happens before retry:
Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(v -> {
/* your potentially crashing code here */
})
.retry(e -> true)
.subscribe(v -> { /* non-crashing code here */ }, e -> { /* ... */ }}
thank you very much!
Great!
But is there anyway to retry error on downstreams like subscribe(),cuz if error happens here, it'll be catch by error callback in subscribe() and interrupt the whole steam?
Most helpful comment
retrycan only deal with errors upstream to it. You have to reorganize your flow in a way that the operation that may throw happens beforeretry: