Hi,
I've discovered an unexpected behavior of the Flowable.flatMap() operator leading to a possible subscription leak. Localized my problem in the following scenario:
Replacing the flatMap operator with concatMap fixes the problem (such a replacement is valid in my case). However I decided to raise this issue because such behavior of flatMap looks quite odd to me. Could somebody from RxJava team confirm whether my expectation is valid and the observed behavior is a defect?
I've reproduced this issue in a code snippet: https://gist.github.com/eugene-zolotko/b24cbc436bc0eab2ed5de539b9e4e312
I'm expecting this code to produce "flowable2 cancelled" output. But got "flowable2 error" instead, plus error2 gets thrown as UndeliverableException because it occurs after the entire chain is terminated.
Sources emitting an error are considered cancelled so it is completely legal to not call cancel on them. Use doFinally to cleanup upon all sorts of termination or cancellation.
Thank you for looking into this. But I'm afraid I wasn't specific enough in my question above. In the following code snippet I expected error2 to never get emitted because error1 terminates the entire flow sooner that flowable2 emits error2. However error2 actually fired (triggered an UndeliverableException).
final Exception error1 = new Exception("1");
final Flowable<Integer> flowable1 = Flowable.just(1).delay(1, TimeUnit.SECONDS)
.concatWith(Flowable.<Integer>error(error1).delaySubscription(1, TimeUnit.SECONDS));
final Exception error2 = new Exception("2");
final Flowable<Object> flowable2 =
Flowable.error(error2).delaySubscription(2, TimeUnit.SECONDS)
.doOnCancel(() -> System.out.println("flowable2 cancelled"))
.doOnError(error -> System.out.println("flowable2 error"));
flowable1.flatMap(i -> flowable2).test()
.awaitDone(4, TimeUnit.SECONDS)
.assertError(error1);
So my questions are:
onErrorX or not let RxJava know about the error in the first placeClosing via #6826 & #6827.
Thank you very much for your support.