Rxjava: Race condition with retryWhen()?

Created on 21 Dec 2017  路  11Comments  路  Source: ReactiveX/RxJava

Here in this sample project you can see that Single.retryWhen() produces inconsistent results, depending on timing:

  • retryWhen() test method gets one less retry than expected
  • retryWhenWithDelay() test method works as expected

My suspicion is that when we do not add a time delay, onComplete is called immediately after the last onNext() call and immediately terminates the stream with NoSuchElementException.

2.x Question

All 11 comments

In some sense, there is a race condition with your retry flow. When an error is signalled, there should be a single response onNext|onError|onComplete from the transformation. However, with a range(), you'll have an onNext directly followed by onComplete which is interpreted by the operator as indication for stopping the entire sequence. Now if the upstream is synchronous, this will have only effect after the last retry. If the upstream is asynchronous, you'll get a non-deterministic cutoff.

Yeah, I think I understand what you are saying.

For us this behaviour of retryWhen() lead to a very difficult to understand bug. We spent about 3 days on it until we understood that this is how retryWhen() behaves.

The bug we had is that our stream would fail with UndeliverableException due to this behaviour. It fails on every last retry cycle. I suspect what happens is that the last onEvent() call triggers the last retry cycle, and then steam is immediately terminated. When our code tries to emit in the last retry cycle, the stream is already terminated and we fail with UndeliverableException.

It was so painful to understand it :)

as a temporary workaround to this problem, we have to add this to our code

                if (emitter.isDisposed()) {
                    return;
                }

There exist a builder for retryWhen in rxjava2-extras, assuming your retry patterns can be expressed by it.

I'll post a JavaDoc clarification regarding the operator shortly.

I appreciate the suggestion to add javadoc.

I do believe though that this may be a bit too complex to be expressed in JavaDoc, so that every user clearly understand what situation they may get themselves into. Ideally API should protect the user from getting themselves into a muddle :) Not sure if we can achieve that here but I suggest that we at least try.

@akarnokd but as I see in the doc, you say:

If the upstream to
 +     * the operator is asynchronous, signalling onNext followed by onComplete immediately may
 +     * result in the sequence to be completed immediately. Similarly, if this inner
 +     * {@code Publisher} signals {@code onError} or {@code onComplete} while the upstream is
 +     * active, the sequence is terminated with the same signal immediately.

In our case, the problem was that the sequence was not terminated in time. It signaled onError and then another call to the upstream would happen (upstream is async). I don't feel the new docs mention this. May be I don't understand it.

Please provide a minimal and standalone unit test that reproduces your problem.

@akarnokd
Sample Project


12-22 10:36:22.531 17681-17704/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:36:23.353 17681-17704/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:36:26.357 17681-17704/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:36:26.579 17681-17704/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:36:29.580 17681-17704/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:36:29.788 17681-17704/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:36:30.790 17681-17722/com.nmp90.rxjava_retry D/MainActivity: ZIP COMPLETE
12-22 10:36:30.793 17681-17681/com.nmp90.rxjava_retry E/MainActivity: onError:                                                                
 java.util.NoSuchElementException
12-22 10:36:32.789 17681-17704/com.nmp90.rxjava_retry D/MainActivity: REQUEST

The problem is the last call to REQUEST after NoSuchElementException :+1: Any help is much appreciated.

Don't zip with a range(). See the new async example in #5773.

So the fix would be

              .retryWhen(throwableFlowable -> {
                    AtomicInteger counter = new AtomicInteger();
                    return throwableFlowable
                            .takeWhile(e -> counter.getAndIncrement() < 3)
                            .flatMap(err -> {
                                if (err instanceof IOException) {
                                    Log.d(TAG, "RETRY");
                                    return Flowable.timer(1l, TimeUnit.SECONDS);
                                } else {
                                    Log.e(TAG, "ERROR", err);
                                    return Flowable.error(err);
                                }
                            }).doOnComplete(() -> Log.d(TAG, "ZIP COMPLETE"));
                })

Which actually does the job.

12-22 10:53:31.823 19057-19075/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:53:32.674 19057-19075/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:53:35.679 19057-19075/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:53:35.943 19057-19075/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:53:38.946 19057-19075/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:53:39.189 19057-19075/com.nmp90.rxjava_retry D/MainActivity: RETRY
12-22 10:53:42.191 19057-19075/com.nmp90.rxjava_retry D/MainActivity: REQUEST
12-22 10:53:42.408 19057-19075/com.nmp90.rxjava_retry D/MainActivity: ZIP COMPLETE
12-22 10:53:42.412 19057-19057/com.nmp90.rxjava_retry E/MainActivity: onError: 
                                                                      java.util.NoSuchElementException

Thank you, I used Flowable.range because I think I saw it as an example on StackOverflow :smile:

Closing via #5773.

Was this page helpful?
0 / 5 - 0 ratings