If I set a Scheduler in .observeOn(...) for an Observable and call subscriber.onError immediately after subscriber.onNext, onNext is never called in my Observer.
If I either add a sleep between subscriber.onNext and subscriber.onError or don't set a Scheduler in .observeOn(...), onNext is called in my Observer.
See example:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Test");
subscriber.onError(null);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("onNext");
}
});
// Only prints "onError", but should print "onNext" and then "onError"
True of Schedulers.io() so not Android-specific.
@mttkay I'v been digging around in the code a bit and I can see that failure = true; is set in OperatorObserveOn$ObserveOnSubscriber.onError, i.e. when the operation is queued, not executed. This seems a bit premature since it doesn't take into consideration the order in which the operations are called, thus setting an overall state of failure before we actually hit the onError object in the polling queue.
What about setting this state inside pollQueue instead whenever we hit an onError object? This way the state is set in the correct order and the queue is executed in a deterministic way.
This is the expected behavior: errors cut ahead of normal values to fail early.
@akarnokd I get it if you call onNext -> onError -> onNext, then the second onNext should not be called, but if you call onNext -> onError one would expect the first call to onNext to go through, right?
Currently this behaviour is not deterministic and depends on when pollQueue is called.
Sometimes onNext is called other times it's not called.
In my case I ask a database for an entry and then call subscriber.onNext. Immediately after I try to request a server for the same entry to check if it's updated. If there is no connection I immediately get an exception and call subscriber.onError.
When this happens I'd like my observer to use whatever was sent in the first call to onNext so I can show cached content, but currently this is not possible because onError is called prematurely.
How would I then do something like that?
The best you can do is to turn that exception into a regular value with onErrorReturn or onErrorResumeNext so it won't preempt the other values; however, you need a way to comine the regular value type with an error-indicating type (i.e., Notification<Result>).
Deal with the error before you do observeOn, or materialize and dematerialize across. The rationale for this behavior was a case where the receiving side of the observeOn was much slower than the sending side. In that case it is surprising if an error takes a long time to propagate.
So really it is a choice between two surprising behaviors.
@headinthebox Not sure what you meant by dealing with the error before I do observeOn, but it seems like the best compromise here would be to use onErrorReturn with Notification<Result>, even though that clutters the code with a lot of seemingly unnecessary handling.
Something like this:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Test");
subscriber.onError(null);
}
}).materialize().observeOn(AndroidSchedulers.mainThread()).<String>dematerialize()
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("onNext");
}
});
Ahh, I wasn't aware of the materialize/dematerialze operators. Thanks!
Since this probably will stay as is I guess this is the best compromise I can work with.
@akarnokd I met the same situation as @mariusgreve .
Observable.create((ObservableOnSubscribe<Response>) e ->{
//First check cache.if exits
e.onNext(createResponseFromLocalFile(request, file));
//then use local cache-time go to server check is cache can use
}.map( response -> {
return praseResponseToJson();
}).doOnNext(jsonObject ->{
//do some thing like cache server result and check result errors
}).onErrorResumeNext(throwable ->{
//because I have some custom exception,I need process them first
if (throwable instanceof SomeException)
return Observable.error(new NetWorkException(throwable));
else
return Observable.error(throwable);
}).retryWhen()//retry maxRetryTimes when failed
..observeOn()
When there is no network,I will not receive cache result because exception occur immediately, How to use materialize/dematerialze operators in my situation.
Note to future self because I know I'm going to forget this (again): you can use .observeOn(Scheduler scheduler, boolean delayError) to ensure onNext isn't skipped by onError.
Most helpful comment
Note to future self because I know I'm going to forget this (again): you can use
.observeOn(Scheduler scheduler, boolean delayError)to ensure onNext isn't skipped by onError.