In init method ObserveOnSubscriber::requested is updated but Subscriber::requested is not.
I think it should be something like this:
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
ObserveOnSubscriber.this.request(requested.get());
schedule();
}
}
});
Otherwise, subscribers that wrap ObserveOnSubscriber (that wraps subscriber with requested value NOT_SET) gets bufferSize as requested value.
Why do you think that?
That's why: http://stackoverflow.com/questions/35343474/why-does-it-need-onbackpressure-here-for-click-events. The code seems OK and I spent some time in debugger to see that requested is 16 when it should be NOT_SET.
That's not the problem with the OP's question. His setup didn't properly request more data if a value was mapped to an exception and then dropped.
ObserveOn decouples the upstream requesting and the downstream requesting. It requests a predetermined amount every time and is not affect how much the downstream is actually requesting. Your suggested change over-requests from upstream and will lead to MissingBackpressureException much early.
Thanks, that is quite tricky =)
Most helpful comment
That's not the problem with the OP's question. His setup didn't properly request more data if a value was mapped to an exception and then dropped.
ObserveOn decouples the upstream requesting and the downstream requesting. It requests a predetermined amount every time and is not affect how much the downstream is actually requesting. Your suggested change over-requests from upstream and will lead to
MissingBackpressureExceptionmuch early.