Rxjava: 2.1.0 - Flowable.createUnsafe combined with .filter operator results in NPE

Created on 30 May 2017  Â·  9Comments  Â·  Source: ReactiveX/RxJava

Consider for a while that I don't care about backpressure and create Flowable with createUnsafe factory method. This works for simple case. However, if I apply a filter to that flowable, it results with NPE.

Event if using .filter combinator on unsafe Flowable could be dangerous and this is by design, I'd rather get meaningful error instead of random NullPointerException.

Here are 2 spock tests - first passes, second one fails with NPE.

```
class FlowableExample extends Specification {

def "flowable with unsafe create should work"() {
    given: 'flowable created with unsafe create'
    def numbers = Flowable.unsafeCreate({ subscriber ->
        subscriber.onNext(1)
        subscriber.onNext(2)
        subscriber.onComplete()
    })

    expect: 'subscriber to get all numbers'
    numbers
            .test()
            .assertValues(1, 2)
            .assertComplete()
}

def "flowable with unsafe create and applied filter should work"() {
    given: 'flowable created with unsafe create'
    def numbers = Flowable.unsafeCreate({ subscriber ->
        subscriber.onNext(1)
        subscriber.onNext(2)
        subscriber.onComplete()
    })

    when: 'filter applied'
    def shouldContainNoItems = numbers.filter({ whatever -> false })

    then: 'subscriber should not get filtered numebrs'
    shouldContainNoItems
            .test()
            .assertNoValues()
            .assertComplete()
}

}

and here's the stacktrace:

java.lang.NullPointerException
at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.onNext(FlowableFilter.java:53)
at com.example.FlowableExample.flowable with unsafe create and applied filter should work_closure2(FlowableExample.groovy:27)
at groovy.lang.Closure.call(Closure.java:423)
at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.Flowable.test(Flowable.java:16182)
at com.example.FlowableExample.flowable with unsafe create and applied filter should work(FlowableExample.groovy:36)

```

2.x Question

All 9 comments

create Flowable with createUnsafe factory method

Why? What's wrong with the standard create() method, or using Observable.create if you don't need backpressure?

From the Javadoc:

Create a Flowable by wrapping a Publisher which has to be implemented according to the Reactive-Streams specification by handling backpressure and cancellation correctly; no safeguards are provided by the Flowable itself.

From the specification:

In response to a call to Publisher.subscribe(Subscriber) the possible invocation sequences for methods on the Subscriber are given by the following protocol:

onSubscribe onNext* (onError | onComplete)?

This means that onSubscribe is always signalled, followed by a possibly unbounded number of onNext signals (as requested by Subscriber) followed by an onError signal if there is a failure, or an onComplete signal when no more elements are available—all as long as the Subscription is not cancelled.

What's wrong with the standard create() method, or using Observable.create if you don't need backpressure?

@slnowak, yup, if you don't care about backpressure — just use Flowable.create( { emitter -> {}, BackpressureStrategy.MISSING).

However if you're writing some kind of API it's probably better to allow consumers to pass backpressure strategy that they want as a parameter in each particular use case.

if you don't care about backpressure — just use Flowable.create( { emitter -> {}, BackpressureStrategy.MISSING)

That's a bad advice @artem-zinnatullin , MISSING means "I know what I'm doing, this won't overflow anything" or "I'll apply a custom onBackpressureX operator later".

Well, there are cases when you don't "care" about backpressure, but need Flowable, i.e. when you need ReactiveStreams compatible type.

Isn't

"don't care about backpressure"

~=

"I know what I'm doing, this won't overflow anything" or "I'll apply a custom onBackpressureX operator later"

?

Though, @akarnokd, I think your comment ^ is still very helpful here since this issue will probably receive fair amount of Google search hits and discussion will help readers more carefully think about backpressure and RxJava/RS type system.

Are you trying to convince me right now that getting NPE is expected behavior? ;) Because that's what the issue all about. I shouldn't mention backpressure in my first post at all, just wanted to let you guys know there's probably a bug in 'createUnsafe'.

But I really appreciate the discussion :)

You get NPE because you didn't call onSubscribe first. When using unsafeCreate, it is your duty to follow the protocol of the target reactive type. Also unsafeCeate is for advanced use and you should definitely use create() instead.

Ok, thanks for help! You can close the issue :)

NPE to communicate one is not following protocol is the cleanest solution :)

Using a method having unsafe clearly in its name is not the cleanest way programming either.

Was this page helpful?
0 / 5 - 0 ratings