Rxjava: OperatorSwitch Backpressure

Created on 5 Apr 2016  路  5Comments  路  Source: ReactiveX/RxJava

Debugging an issue with switchMap led me to reviewing the OperatorSwitch code. I think it might be wrong in its approach on backpressure (at least when used as switchMap), but before attempting code wanted to discuss conceptually. (Note that I have not considered switchOnNext or switchOnEmpty where the current approach may be fine.)

Right now it appears to pass-thru the requestN value from downstream to the upstream. This doesn't seem correct for a switchMap case. Instead I think it should be have as follows:

  • request 2 on initial subscribe
  • map starts on first received onNext
  • downstream request(n) is propagated to the inner Observable
  • onNext invoked a second time with new Observable
  • request(1) upwards to allow the upstream to send a new Observable
  • propagate remaining request(n) from downstream to the new inner

In other words, quite similar to how concatMap works for maintaining the request(n) value across inner Observables, except that it unsubscribes the previous inner on receiving a new Observable.

Question

All 5 comments

cc @jspahrsummers

The OperatorSwitch does unbounded-in on the outer Observable but keeps track of the emission from each inner Observable. When the switch happens, all unfulfilled request will be forwarded to the new inner Observable.

That's not the behavior I'm seeing. Looks like I need to create a unit test.

... and of course can't reproduce in a simple unit test. Confusing bloody bug. I'll re-open another legit issue if I can track it down.

Thanks @akarnokd

Could you share a little bit more about your issue? What operators were involved? Was heavy switching involved with async sources or consumption?

Was this page helpful?
0 / 5 - 0 ratings