Rxjava: Multiple errors in zip'ed observables with flatMap

Created on 26 May 2019  路  7Comments  路  Source: ReactiveX/RxJava

While reading Error handling on wiki I realized that in a zip operator with a multiple network calls, it's possible to receive a crash with an UndeliverableException. I tried to reproduce this behavior:

RxJava version: 2.2.0

Reproduce code:

    @Test
    public void testUncaughtException() throws InterruptedException {
        Observable first = Observable.create(e -> {
            System.out.println("first");
            throw new HelperException("first exception");
        });

        Observable second = Observable.create(e -> {
            System.out.println("second");
            throw new HelperException("second exception");
        });

        List<Observable<?>> observableList = new ArrayList<>();
        observableList.add(first);
        observableList.add(second);

        Observable.zip(observableList, objects -> "result")
                .subscribeOn(Schedulers.io())
                .subscribe(
                        System.out::println,
                        t -> {
                            System.out.println("exception caught!");
                        }
                );
        Thread.sleep(2000);
    }

The output as expected:

first
exception caught!
second
io.reactivex.exceptions.UndeliverableException: ... HelperException: second exception ...

And the second test:

    @Test
    public void testUncaughtExceptionWithFlatMap() throws InterruptedException {
        Observable testObservable = Observable.create(e -> e.onNext(""))
                .flatMap((Function<Object, ObservableSource<?>>) o -> {

                    Observable first = Observable.create(e -> {
                        System.out.println("first");
                        throw new HelperException("first exception");
                    });

                    Observable second = Observable.create(e -> {
                        System.out.println("second");
                        throw new HelperException("second exception");
                    });

                    List<Observable<?>> observableList = new ArrayList<>();
                    observableList.add(first);
                    observableList.add(second);

                    return Observable.zip(observableList, objects -> "result");
                });

        testObservable
                .subscribeOn(Schedulers.io())
                .subscribe(
                        System.out::println,
                        t -> System.out.println("exception caught!")
                );

        Thread.sleep(2000);
    }

And I expected an UndeliverableException too, but the output is:

first
exception caught!

Is this behavior correct? Why there is no UndeliverableException in the second test?

Thanks!

2.x Bug

Most helpful comment

Correction. In both cases, you shouldn't see any undeliverable exceptions. What happens is that flatMap issues a cancel when it detects the error which then stops the subscription to the second source. I'll investigate this further.

All 7 comments

In the second case, you crash synchronously when zip subscribes to the first source and it will never subscribe to the second source.

@akarnokd thank you for quick response! Can you explain, please, why in the second case crash happens synchronously and in the first not?

Correction. In both cases, you shouldn't see any undeliverable exceptions. What happens is that flatMap issues a cancel when it detects the error which then stops the subscription to the second source. I'll investigate this further.

The Flowable.zip works properly so this is an inconsistency with Observable.zip. I'll post a fix shortly.

@arkanokd

https://github.com/ReactiveX/RxJava/issues/6487#issuecomment-496018889

So, you saying that the correct one is not return an error right?

In this synchronous case, the process should stop after the first error and the second subscription should not happen.

@akarnokd thanks for explanations!

i am currently using onErrorResumeNext to tweak that part. so, whenever error occurs in observable i return default value.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

hoc081098 picture hoc081098  路  3Comments

theblang picture theblang  路  3Comments

yubaokang picture yubaokang  路  3Comments

francorolando picture francorolando  路  3Comments

midnight-wonderer picture midnight-wonderer  路  3Comments