Rxjava: NullPointerException caused by null elements in merge() queue

Created on 29 Mar 2016  路  2Comments  路  Source: ReactiveX/RxJava

Take a look at this (admittedly somewhat wonky) sample stream:

Observable.range(0, 2)
    .flatMap(__ -> Observable.just(null))
    .subscribe(new Subscriber<Object>() {
      @Override public void onNext(Object o) {
        System.out.println("onNext(" + o + ")");
      }

      @Override public void onError(Throwable e) {
        e.printStackTrace();
      }

      @Override public void onStart() {
        request(1);
      }

      @Override public void onCompleted() { }
    });

In this sample I'm purposefully only requesting one item so that the second one ends up getting queued in the internal OperatorMerge. As a result of passing null to the queue, though, I end up getting this in onError:

java.lang.NullPointerException
    at rx.internal.util.atomic.SpscUnboundedAtomicArrayQueue.offer(SpscUnboundedAtomicArrayQueue.java:71)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.queueScalar(OperatorMerge.java:465)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:437)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:228)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.fastpath(OnSubscribeRange.java:126)
    at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:63)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:38)
    at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:26)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable.subscribe(Observable.java:8191)
    at rx.Observable.subscribe(Observable.java:8158)
    at net.danlew.experiments.Tester.main(Tester.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: 1
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
    at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187)
    at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
    ... 18 more

I'm running into in more complex code where the requests # is different than the # items pushed and seeing the same problem.

Bug

Most helpful comment

Closing via #3809.

All 2 comments

This is a bug on the entry side of the scalar-queue not encoding the null value. Fix posted in #3809.

Closing via #3809.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dlew picture dlew  路  4Comments

dzharikhin picture dzharikhin  路  4Comments

midnight-wonderer picture midnight-wonderer  路  3Comments

nltran picture nltran  路  4Comments

hoc081098 picture hoc081098  路  3Comments