Rxjava: 2.x: Single, toObservable and share: sometimes NoSuchElementException

Created on 16 May 2018  路  2Comments  路  Source: ReactiveX/RxJava

Hello. I periodically have a NoSuchElementException. Version 2.1.12.

Code:

Single<String> single = Single.just("12345")
              .toObservable()
              .share()
              .singleOrError();

      for (int i = 0; i < 10; i++) {
         new Thread(() -> {
            single.subscribe((t) -> {
            }, thrw -> {
               if (NoSuchElementException.class.equals(thrw.getClass())) {
                  printStackTrace();
                  return;
               }
            });
         }).start();

         if (i % 2 != 0) {
            Thread.sleep(4);
         }
      }

Stacktrace:

at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java:47)
    at io.reactivex.internal.operators.observable.ObservableSingleSingle$SingleElementObserver.onComplete(ObservableSingleSingle.java:113)
    at io.reactivex.internal.operators.observable.ObservableRefCount$ConnectionObserver.onComplete(ObservableRefCount.java:145)
    at io.reactivex.internal.operators.observable.ObservablePublish$PublishObserver.onComplete(ObservablePublish.java:193)
    at io.reactivex.internal.observers.DeferredScalarDisposable.complete(DeferredScalarDisposable.java:85)
    at io.reactivex.internal.operators.single.SingleToObservable$SingleToObservableObserver.onSuccess(SingleToObservable.java:73)
    at io.reactivex.internal.operators.single.SingleJust.subscribeActual(SingleJust.java:30)
    at io.reactivex.Single.subscribe(Single.java:3220)
    at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:36)
    at io.reactivex.Observable.subscribe(Observable.java:12005)
    at io.reactivex.internal.operators.observable.ObservablePublish.connect(ObservablePublish.java:116)
    at io.reactivex.internal.operators.observable.ObservableRefCount.subscribeActual(ObservableRefCount.java:68)
    at io.reactivex.Observable.subscribe(Observable.java:12005)
    at io.reactivex.internal.operators.observable.ObservableSingleSingle.subscribeActual(ObservableSingleSingle.java:35)
    at io.reactivex.Single.subscribe(Single.java:3220)
    at io.reactivex.Single.subscribe(Single.java:3206)

Is this a bug or an incorrect code? For cycle is written to demonstrate the problem.

2.x Question

Most helpful comment

This is a plausible outcome when using share. The underlying publish operator emits items to currently subscribed observers only. With that race in your code, some observers may come after the "12345" has been emitted but just before onComplete is signalled, thus they encounter an empty sequence.

Use replay(1).refCount() to ensure you don't lose that single item.

All 2 comments

This is a plausible outcome when using share. The underlying publish operator emits items to currently subscribed observers only. With that race in your code, some observers may come after the "12345" has been emitted but just before onComplete is signalled, thus they encounter an empty sequence.

Use replay(1).refCount() to ensure you don't lose that single item.

Thanks, it works!

Was this page helpful?
0 / 5 - 0 ratings