Rxjava: 2.x CompletableCreate.Emitter throws unhandled exception if disposed

Created on 2 May 2017  ·  10Comments  ·  Source: ReactiveX/RxJava

RxJava 2.1.0

A Completable which was created using a CompletableEmitter and Completable.create(CompletableOnSubscribe) will throw an unhandled exception if the used creation block calls emitter.onError(e) _after_ the Completable has been disposed. This should be considered a bug, since it is not possible to check whether the emitter is disposed _and_ call its onError() method as one atomic operation (which gives room for race conditions).

Here's an example reproducing this issue:

public class CompletableEmitterException {

    static Disposable disposable;

    public static void main(String[] args) throws Exception {
        Completable completable = Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(@NonNull CompletableEmitter e) throws Exception {
                disposable.dispose();
                e.onError(new RuntimeException("Always fails!"));
            }
        });

        disposable = completable
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("Completed successfully. This should not happen.");
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    System.out.println("Completable emitted error. This is what we would expect!");
                }
            });

        Thread.currentThread().wait();
    }
}

which terminates the application with following exception:

Exception in thread "main" java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at at.droiddave.rxjava2.CompletableEmitterException.main(CompletableEmitterException.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
io.reactivex.exceptions.UndeliverableException: java.lang.RuntimeException: Always fails!
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
    at io.reactivex.internal.operators.completable.CompletableCreate$Emitter.onError(CompletableCreate.java:92)
    at at.droiddave.rxjava2.CompletableEmitterException$1.subscribe(CompletableEmitterException.java:23)
    at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)
    at io.reactivex.Completable.subscribe(Completable.java:1635)
    at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
Process finished with exit code 1

The typical scenario hitting this issue is a long-running completable block, which fails _after_ some external event disposed the completable.

2.x Question

Most helpful comment

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

All 10 comments

The issue is actually falling back to propagating the exception globally if the completable has been disposed, here: https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java#L92

This is by design. Since such errors are the results of the emitter code block by the developer, RxJava can't know how important the error is but can't violate the protocol either, thus signalling the error globally. You should check isDisposed before trying to emit to a consumer that might no longer be interested in receiving errors.

This is right, which is the fix I applied. But still, there is a small probability of being disposed in between the isDisposed check and the onError() call, which would cause the same exception – with no real solution to avoid that, as far as I understand.

Install a no-op error handler on the RxJavaPlugins and all such errors go away...

Will do that 👍

Is the current behavior documented? I did not find anything about it. Would be useful for others too.

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling

Then it becomes impossible to detect actual problems in error handlers, no?

On Tue, May 2, 2017, 11:47 AM David Karnok notifications@github.com wrote:

Install a no-op error handler on the RxJavaPlugins and all such error go
away...


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/5330#issuecomment-298672175,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEETtcFMuRZWO7pAH5ZBfdQoX68Qneks5r101ugaJpZM4NOSeq
.

You could probably use specialized exception types to deliver that information.

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

Was this page helpful?
0 / 5 - 0 ratings