Rxjava: Probable bug in ObserveOnSubscriber

Created on 28 Apr 2016  路  4Comments  路  Source: ReactiveX/RxJava

In init method ObserveOnSubscriber::requested is updated but Subscriber::requested is not.

I think it should be something like this:

localChild.setProducer(new Producer() {

      @Override
      public void request(long n) {
           if (n > 0L) {
                 BackpressureUtils.getAndAddRequest(requested, n);
                 ObserveOnSubscriber.this.request(requested.get());
                 schedule();
            }
       }

});

Otherwise, subscribers that wrap ObserveOnSubscriber (that wraps subscriber with requested value NOT_SET) gets bufferSize as requested value.

Question

Most helpful comment

That's not the problem with the OP's question. His setup didn't properly request more data if a value was mapped to an exception and then dropped.

ObserveOn decouples the upstream requesting and the downstream requesting. It requests a predetermined amount every time and is not affect how much the downstream is actually requesting. Your suggested change over-requests from upstream and will lead to MissingBackpressureException much early.

All 4 comments

Why do you think that?

That's why: http://stackoverflow.com/questions/35343474/why-does-it-need-onbackpressure-here-for-click-events. The code seems OK and I spent some time in debugger to see that requested is 16 when it should be NOT_SET.

That's not the problem with the OP's question. His setup didn't properly request more data if a value was mapped to an exception and then dropped.

ObserveOn decouples the upstream requesting and the downstream requesting. It requests a predetermined amount every time and is not affect how much the downstream is actually requesting. Your suggested change over-requests from upstream and will lead to MissingBackpressureException much early.

Thanks, that is quite tricky =)

Was this page helpful?
0 / 5 - 0 ratings