Not sure if this is intended behavior, but when using a PublishSubject (and any other subject) the error seems to be cached, so each time I subscribe to the subject it propagates that error again. Additionally, subsequent calls to onNext are ignored.
PublishSubject<String> subject = PublishSubject.create();
subject.onError(new RuntimeException());
subject.subscribe(
System.out::println,
System.err::println
);
subject.onNext("Hello World!"); // never emitted
If this is intended behavior, how would I go about implementing something that emits errors but will continue normally afterwards?
I've resolved in the mean time to using two subjects: one that emits the onNext, one that emits the onError:
PublishSubject<String> successSubject = PublishSubject.create();
PublishSubject<Throwable> errorSubject = PublishSubject.create();
observable.subscribe(
successSubject::onNext,
errorSubject::onNext
);
This is by design. If you need to send multiple error events, you need to wrap the values into a container such as Notification.
Thanks for the response. I'll look into Notifications.
@akarnokd I understand the reasons behind this, but following your advice would mean unwrapping that container object inside onNext() and checking whether or not it is of kind error, then dealing with the error from within that context. That kind of feels strange to me, is there any other way to workaround this?
If you don't want to wrap onNext events you can always have an Observable
that emits errors but will continue normally afterwards?
Errors are terminal events and everything shuts down. That is the contract, similar to throwing an exception. If terminal behavior is not wanted, then don't emit an error via 'onError' but instead as a message through 'onNext' since it's not an actual error (a terminal event) if multiple can be sent.
As @akarnokd stated, materialize()/ dematerialize() are available to turn terminal events into Notifications.
You can also decouple streams so one terminates and is then restarted every time an error is received by using retry(), like this: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
@akarnokd doing that doesn't seem much better, since now your type parameter would need to be Object rather than the expected type. It's a murky solution either way. What would be nice is a way to reset an Observable after it has been terminated, either by onComplete or onError, by just putting it back in its initial state, but keep its subscribers. This can be done manually, but an Rx solution would be handy.
It sounds like this flies in the face of the designed contracts, though.
Why just not have onNext and onError methods?
The Observable protocol allows only at most 1 error signal. This makes it unambiguous when a flow has ended. With multiple errors, there is generally no way to know if there could be more signals or your source crashed for good. Also it would make cold retries difficult: which of the arbitrary number or type of error should trigger a complete retry?
What you may be looking for is multi-value flows which, until Java gets value types, you have to emulate with tuples/record classes.
@akarnokd thanks for the comment, I've actually using _RxSwift_ and was comparing the _Rx_ interface with more usual _Promises_ or _Futures_ interfaces. They look a bit nicer at the call site, as I may want just propagate the error further, but handle Success a bit differently.
Most helpful comment
@akarnokd doing that doesn't seem much better, since now your type parameter would need to be
Objectrather than the expected type. It's a murky solution either way. What would be nice is a way to reset an Observable after it has been terminated, either by onComplete or onError, by just putting it back in its initial state, but keep its subscribers. This can be done manually, but an Rx solution would be handy.It sounds like this flies in the face of the designed contracts, though.