Rxjava: 2.x: Unexpected Flowable.flatMap(...) behavior while handling an upstream error

Created on 8 Jan 2020  路  5Comments  路  Source: ReactiveX/RxJava

Hi,

I've discovered an unexpected behavior of the Flowable.flatMap() operator leading to a possible subscription leak. Localized my problem in the following scenario:

  1. RxJava version 2.2.16.
  2. Given a simple Rx chain of flowable1.flatMap(value -> flowable2).
  3. flowable1 emits a value (so that flatMap subscribes to flowable2).
  4. flowable1 emits an error.
    Expected result: flatMap disposes flowable2 upon handling the upstream error.
    Actual result: flowable2 remains subscribed after the entire Rx chain terminates because of the error.

Replacing the flatMap operator with concatMap fixes the problem (such a replacement is valid in my case). However I decided to raise this issue because such behavior of flatMap looks quite odd to me. Could somebody from RxJava team confirm whether my expectation is valid and the observed behavior is a defect?

I've reproduced this issue in a code snippet: https://gist.github.com/eugene-zolotko/b24cbc436bc0eab2ed5de539b9e4e312
I'm expecting this code to produce "flowable2 cancelled" output. But got "flowable2 error" instead, plus error2 gets thrown as UndeliverableException because it occurs after the entire chain is terminated.

2.x 3.x Bug

All 5 comments

Sources emitting an error are considered cancelled so it is completely legal to not call cancel on them. Use doFinally to cleanup upon all sorts of termination or cancellation.

Thank you for looking into this. But I'm afraid I wasn't specific enough in my question above. In the following code snippet I expected error2 to never get emitted because error1 terminates the entire flow sooner that flowable2 emits error2. However error2 actually fired (triggered an UndeliverableException).

    final Exception error1 = new Exception("1");
    final Flowable<Integer> flowable1 = Flowable.just(1).delay(1, TimeUnit.SECONDS)
        .concatWith(Flowable.<Integer>error(error1).delaySubscription(1, TimeUnit.SECONDS));

    final Exception error2 = new Exception("2");
    final Flowable<Object> flowable2 =
        Flowable.error(error2).delaySubscription(2, TimeUnit.SECONDS)
            .doOnCancel(() -> System.out.println("flowable2 cancelled"))
            .doOnError(error -> System.out.println("flowable2 error"));

    flowable1.flatMap(i -> flowable2).test()
        .awaitDone(4, TimeUnit.SECONDS)
        .assertError(error1);

So my questions are:

  1. Is this an expected behavior? 2. Is there a way to avoid throwing error2 as an UndeliverableException (except for intercepting it with RxJavaPlugins.setErrorHandler)? 3. What's the reason of the difference in behavior between flatMap and concatMap is such conditions?
  1. No, the inner sequence should get cancelled. I'll investigate this further.
  2. Suppressing the error via onErrorX or not let RxJava know about the error in the first place
  3. Should work the same.

Closing via #6826 & #6827.

Thank you very much for your support.

Was this page helpful?
0 / 5 - 0 ratings