Consider for a while that I don't care about backpressure and create Flowable with createUnsafe factory method. This works for simple case. However, if I apply a filter to that flowable, it results with NPE.
Event if using .filter combinator on unsafe Flowable could be dangerous and this is by design, I'd rather get meaningful error instead of random NullPointerException.
Here are 2 spock tests - first passes, second one fails with NPE.
```
class FlowableExample extends Specification {
def "flowable with unsafe create should work"() {
given: 'flowable created with unsafe create'
def numbers = Flowable.unsafeCreate({ subscriber ->
subscriber.onNext(1)
subscriber.onNext(2)
subscriber.onComplete()
})
expect: 'subscriber to get all numbers'
numbers
.test()
.assertValues(1, 2)
.assertComplete()
}
def "flowable with unsafe create and applied filter should work"() {
given: 'flowable created with unsafe create'
def numbers = Flowable.unsafeCreate({ subscriber ->
subscriber.onNext(1)
subscriber.onNext(2)
subscriber.onComplete()
})
when: 'filter applied'
def shouldContainNoItems = numbers.filter({ whatever -> false })
then: 'subscriber should not get filtered numebrs'
shouldContainNoItems
.test()
.assertNoValues()
.assertComplete()
}
}
and here's the stacktrace:
java.lang.NullPointerException
at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.onNext(FlowableFilter.java:53)
at com.example.FlowableExample.flowable with unsafe create and applied filter should work_closure2(FlowableExample.groovy:27)
at groovy.lang.Closure.call(Closure.java:423)
at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:37)
at io.reactivex.Flowable.subscribe(Flowable.java:12986)
at io.reactivex.Flowable.test(Flowable.java:16182)
at com.example.FlowableExample.flowable with unsafe create and applied filter should work(FlowableExample.groovy:36)
```
create Flowable with createUnsafe factory method
Why? What's wrong with the standard create() method, or using Observable.create if you don't need backpressure?
From the Javadoc:
Create a Flowable by wrapping a Publisher which has to be implemented according to the Reactive-Streams specification by handling backpressure and cancellation correctly; no safeguards are provided by the Flowable itself.
From the specification:
In response to a call to
Publisher.subscribe(Subscriber)the possible invocation sequences for methods on theSubscriberare given by the following protocol:
onSubscribe onNext* (onError | onComplete)?This means that
onSubscribeis always signalled, followed by a possibly unbounded number ofonNextsignals (as requested bySubscriber) followed by anonErrorsignal if there is a failure, or anonCompletesignal when no more elements are available—all as long as theSubscriptionis not cancelled.
What's wrong with the standard create() method, or using Observable.create if you don't need backpressure?
@slnowak, yup, if you don't care about backpressure — just use Flowable.create( { emitter -> {}, BackpressureStrategy.MISSING).
However if you're writing some kind of API it's probably better to allow consumers to pass backpressure strategy that they want as a parameter in each particular use case.
if you don't care about backpressure — just use
Flowable.create( { emitter -> {}, BackpressureStrategy.MISSING)
That's a bad advice @artem-zinnatullin , MISSING means "I know what I'm doing, this won't overflow anything" or "I'll apply a custom onBackpressureX operator later".
Well, there are cases when you don't "care" about backpressure, but need Flowable, i.e. when you need ReactiveStreams compatible type.
Isn't
"don't care about backpressure"
~=
"I know what I'm doing, this won't overflow anything" or "I'll apply a custom onBackpressureX operator later"
?
Though, @akarnokd, I think your comment ^ is still very helpful here since this issue will probably receive fair amount of Google search hits and discussion will help readers more carefully think about backpressure and RxJava/RS type system.
Are you trying to convince me right now that getting NPE is expected behavior? ;) Because that's what the issue all about. I shouldn't mention backpressure in my first post at all, just wanted to let you guys know there's probably a bug in 'createUnsafe'.
But I really appreciate the discussion :)
You get NPE because you didn't call onSubscribe first. When using unsafeCreate, it is your duty to follow the protocol of the target reactive type. Also unsafeCeate is for advanced use and you should definitely use create() instead.
Ok, thanks for help! You can close the issue :)
NPE to communicate one is not following protocol is the cleanest solution :)
Using a method having unsafe clearly in its name is not the cleanest way programming either.