(I'm learning reactive)
My source:
This is what I want:
I've been able to achieve almost all of this playing with Subjects and RxRelay library. The last two points, in bold, are the one I can't achieve.
In my helper class triggerRefresh method I use RxRelay with a PublishRelay and a ReplaySubject with capacity 1:
coldObservable
.materialize()
.subscribe(relay);
relay
.filter(notification -> notification.getKind() != Notification.Kind.OnCompleted)
.dematerialize()
.subscribe(subject);
I execute this code everytime I need to performe a new http call, I also keep a reference to the two subscription and unsubscribe them before doing it (I just didn't included that part of the code).
as hot observable I simply pass subject.asObservable();
It works as I want unless an error happens.
When an error occur all the Observer stop receiving events even if I trigger a refresh.
How should I handle this kind of situation with rx?
Have you looked at the retry operator?
@JohnWowUs yes, that's not what I'm looking for. Retry operator works transparently for the Observers. What I need here is not a retry, it's getting the error reach the Observers and just keep the subscription alive for the next value / error.
I've looked into this for 2 days now. I think there's no way to do it.
I'm still convinced it should be possible :)
The nearest thing is to probably using the materialize operator on the observable, pass it to a BehaviorRelay or some kind of relay (Jake Warthon RxRelay library) and use the scan operator on the relay to get objects containing both the error the values and, eventually, the state (loading etc...).
The don't think the RxRelay will propagate errors.
It seems like a lot of work simply to propagate retrofit(?) errors down to the subscribers and have the observable continue emitting. Why exactly do you want the subscribers to see the errors? Other than that, all your requirements can be met I think.
I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
@akarnokd ok, thanks, in the end I used .materialize(), mapped it to my custom Event object and a scan() to keep the last data in it across events then sent it to an RxRelay BehaviorRelay. It work fairly good for now. I described what I did (removing some detail) here: https://groups.google.com/forum/#!topic/rxjava/10lHqg9TE4M
I'm facing the same issue, I feel like this is a fairly common use case. Take a simple stream of click events that you flatMap to a stream of something you get from the network using Retrofit.
OnError I'd like to inform my view that something is wrong, but I would like my button to continue triggering network calls.
In a more complex scenario, if I have multiple subscribers observing that observable, they will all stop working after an onError, I'll have to create a new observable and register all my subscriber again.
Before the Observable is returned from the flatMap add on an .onError...(...). It's like the try/catch of Rx.
As I understand it, .onError...(...) will call onComplete right away afterwards (shutting down your observable).
Quoting Dan Lew's article: http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/
"It's worth noting that, when using these operators, the upstream Observables are still going to shut down! They've already seen a terminal event (onError); all that onError[Return|ResumeNext] does is replace the onError notification with a different sequence downstream."
The pattern for this abersnaze is referring to is (swallowing all errors, you can also log etc inside the flatmap using either doOnError(...) or in the onErrorResumeNext(...)):
observableThatShouldNotCompleteOnError
.flatMap(
e ->
obsrevableThatMayThrow(e)
.onErrorResumeNext(Observable.empty())
)
By trapping the error in the inner observable, all the outer subscription sees is the non-error cases so never terminates.
That's exactly it! Thank you both! @abersnaze @jamesgorman2
There is a need of a pattern when the error happens in the Consumer. No better that try-catch it ?
@feresr hello,can you show me your code, thanks
@SrDino
Observable
.create(...)
.switchMap((value)=>
Observable.of(value)
.catch((err)=>Observable.of(objectInsteadOfError))
)
@jamesgorman2
I'm seeking for exactly that behavior, but I cannot reproduce your solution.
Here is the code which I would expect to print all items but the one which is mapped to an exception.
But instead, the error is propagated to downstream and the sequence stops after the last successfully mapped item.
public void resumeObservableOnError() {
Observable.range(1, 3)
.flatMap(item -> observableThatMayThrow(item)
.onErrorResumeNext(Observable.empty())
)
.blockingSubscribe(
next -> System.out.println(next),
ex -> System.out.println("error: " + ex.getMessage()),
() -> System.out.println("completed")
);
}
private Observable<Integer> observableThatMayThrow(Integer i) {
if (i == 2) {
throw new RuntimeException(String.format("failure on value %d", i));
}
return Observable.just(i);
}
The output is:
1
error: failure on value 2
Here is the output I want:
1
3
completed
Can you point me to what I got wrong?
I'm using rxjava 2.2.8 here.
You are throwing from the method observableThatMayThrow that is outside of an RxJava chain. You could have just written flatMap(item -> { throw new RuntimeException(); }) which fails the mapper function and no inner source is ever involved.
You should embed the failure in an RxJava flow via the error() operator, not throw it:
private Observable<Integer> observableThatMayThrow(Integer i) {
if (i == 2) {
return Observable.error(
new RuntimeException(String.format("failure on value %d", i))
);
}
return Observable.just(i);
}
This way, the error is signaled and onErrorResumeNext can turn it into an empty source.
about this code can modify?
<
Observable.just(1,2,3,4)
.doOnNext(r -> {
// calculate result, maybe throw exception_1
})
.doOnNext(r-> {
// cache result, maybe throw exception_2
})
.doOnNext(r-> {
// cache result, maybe throw exception_3
})
.subscribe();
>
Throwing an exception in mono and then subscribing doesn't allow the exception to bubble up. I need a way to bubble up the exception
Example
clien1.getDetails() //returns Mono
.map(details ->
client2.submitDetails(details) //throws an exception to buble up
).subscribe(aVoid -> {
}, error -> {throw new RuntimeException(error);})) . //I wish the exception to bubble up
.doOnError(error -> log.error("Error thrown from first subscriber", error))
.doOnSuccess(data -> log.info("Successfully sent the details", data)) //instead a success is registered here
.subscribe(aVoid -> {
}, error -> {throw new RuntimeException(error);})) . //it doesn't reach here with an exception
Most helpful comment
The pattern for this abersnaze is referring to is (swallowing all errors, you can also log etc inside the flatmap using either
doOnError(...)or in theonErrorResumeNext(...)):By trapping the error in the inner observable, all the outer subscription sees is the non-error cases so never terminates.