Rxjava: Unexpected behavior

Created on 8 Apr 2016  路  5Comments  路  Source: ReactiveX/RxJava

import rx.Observable;

public class RxTest {

    public static void main(String[] args) {
        Observable.<String>create(subscriber -> {
            subscriber.onNext("as");
            subscriber.onNext("asd");
        })
                .map(s -> {
                    if (s.length() < 3) {
                        throw new RuntimeException("");
                    }
                    return 1;
                })
                .flatMap(o -> {
                    System.out.println("flatMap");
                    return Observable.just(true);
                })
                .subscribe(System.out::println, t -> {});
    }
}

Output:

flatMap

But, if we comment line with throw new RuntimeException("") output will be:

flatMap
true
flatMap
true
Question

Most helpful comment

The problem is that your create ignores unsubscription and thus violates the Observable contract. If you add

     if (subscriber.isUnsubscribed()) return;

in between the two calls to onNext, nothing is printed.

All 5 comments

This is the expected behavior. map crashes and flatMap is skipped.

Wow! Be careful:

  1. emit "as": throw -> flatMap is skiped
  2. emit "asd": go to flatMap, output "flatMap", but where is output in subscribe?

The problem is that your create ignores unsubscription and thus violates the Observable contract. If you add

     if (subscriber.isUnsubscribed()) return;

in between the two calls to onNext, nothing is printed.

Oh! Thank you for explanation!

@akarnokd can the observable be unsubscribed on a different thread ? Can it get unsubscribed after the check to isUnsubscribed is done but before calling onNext?

Was this page helpful?
0 / 5 - 0 ratings