import rx.Observable;
public class RxTest {
public static void main(String[] args) {
Observable.<String>create(subscriber -> {
subscriber.onNext("as");
subscriber.onNext("asd");
})
.map(s -> {
if (s.length() < 3) {
throw new RuntimeException("");
}
return 1;
})
.flatMap(o -> {
System.out.println("flatMap");
return Observable.just(true);
})
.subscribe(System.out::println, t -> {});
}
}
Output:
flatMap
But, if we comment line with throw new RuntimeException("") output will be:
flatMap
true
flatMap
true
This is the expected behavior. map crashes and flatMap is skipped.
Wow! Be careful:
throw -> flatMap is skipedflatMap, output "flatMap", but where is output in subscribe?The problem is that your create ignores unsubscription and thus violates the Observable contract. If you add
if (subscriber.isUnsubscribed()) return;
in between the two calls to onNext, nothing is printed.
Oh! Thank you for explanation!
@akarnokd can the observable be unsubscribed on a different thread ? Can it get unsubscribed after the check to isUnsubscribed is done but before calling onNext?
Most helpful comment
The problem is that your
createignores unsubscription and thus violates theObservablecontract. If you addin between the two calls to
onNext, nothing is printed.