RxSwift Backpressure

Created on 10 Aug 2017  路  14Comments  路  Source: ReactiveX/RxSwift

Hey -
I'm coming over from RxJava land, and the biggest notable difference is the lack of any backpressure mechanisms. I haven't been able to find any real discussions around this online, so I figured I'd start a dialogue around this here.

Is backpressure support a planned addition to RxSwift? If not, why?

In the meantime, what's the most idiomatic way to deal with a fast producer and slow consumer? What happens, exactly in this case? Do the emitted items get buffered internally (like RxJava), or dropped?

Most helpful comment

Hello, I would like to present my use case. I'm dealing with a bluetooth IoT that send an array of 20 byte at 200Hz. On Android, I have been able to solve the bakpressure problem with an explicit ring buffer of 500 items (onBackpressureBuffer(500)), which holds data for almost 2.5 seconds.

This is the code in Java:

                ...
                .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(Observable<byte[]> observable) {
                        return observable;
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        promise.resolve(null);
                        transactions.removeSubscription(transactionId);
                    }
                })
                .onBackpressureBuffer(500) <---- just before the call to subscribe
                .subscribe(new Observer<byte[]>() {

I'll need to do the same for iOS.
Could someone point me to some guidance on how to achieve the same behavior?

Thanks in advance.

All 14 comments

Hi, @eliasbagley.

To be hones I've never needed this in RxSwift, as to me it's hard to imagine where in iOS, macOs development you wound need to handle this kind of problem.

But for the moment I would probably use throttle operator if I had it

I think @kzaher I best to answer whether we going to support it :)

@eliasbagley Do you have a specific use-case where you need that specific handling ?

Backpressure is a general name for the strategy of reducing fast producer related situations. throttle, debounce, window and buffer are all part of that family of operators: http://reactivex.io/documentation/operators/backpressure.html

If you're referring to onBackpressureBuffer that seems specific to RxJava/Scala/Groovy and not a "ReactiveX" thing.

I'm still interested to learn what is the specific situation where you find yourself needing this. A code example would be awesome 馃挴

I don't have a specific use case case I need it for right now, just curious and trying to wrap my head around the differences between RxSwift and RxJava.

@eliasbagley Most things are the same, but eventually every implementation of ReactiveX likes to do some things differently and the maintainers can choose to make some things a bit different, like in RxJava's case.

I think for now we can close this issue, but if you have any specific case where that sort of method could be useful, we could consider the effort needed into adding it.

Also feel free to join our Slack channel for more specific questions !

@freak4pc I have a situation where backpressure handling is necessary- I'm handling inserts/moves/deletes in a tableview (AsyncDisplayKit, so can't use rxcocoa bindings), and there is an animation sequence where I need to stop inserting/moving/deleting and wait until the animation finishes. After that, I want to process whatever inserts/moves/deletes were queued while the animation was happening. I think RxSwiftExt's pauseBuffered would solve the issue, but it'd be great if the main library had support for backpressure.

Thanks for the example ! @sidnani
Can you share a RxJava or similar backpressure example solving what you just described ?

@freak4pc I'm not too familiar with RxJava, but the docs seem to indicate that onBackpressureBuffer (RxJava 1.0) would solve this. RxJS's methods, which RxSwiftExt/pausable is based on (as far as I know) seem better suited though.

@sidmani That doesn't sound like backpressure at all tho. You want to hold onto emission for some timeline when something else happens (e.g. animation which could be a second observable). That definitely does sound like pausableBuffered which is part of RxSwiftExt, actually.

Well, pausableBuffered is categorized under backpressure in RxJS, and technically this situation does deal with an observable that emits events faster than they're handled.

I'll repeat this again :)
So are Throttle, Debounce, Window, Buffer and Sample.

pausableBuffered is only a RxJS operator. It's not used across ReactiveX projects.

Hello, I would like to present my use case. I'm dealing with a bluetooth IoT that send an array of 20 byte at 200Hz. On Android, I have been able to solve the bakpressure problem with an explicit ring buffer of 500 items (onBackpressureBuffer(500)), which holds data for almost 2.5 seconds.

This is the code in Java:

                ...
                .flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(Observable<byte[]> observable) {
                        return observable;
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        promise.resolve(null);
                        transactions.removeSubscription(transactionId);
                    }
                })
                .onBackpressureBuffer(500) <---- just before the call to subscribe
                .subscribe(new Observer<byte[]>() {

I'll need to do the same for iOS.
Could someone point me to some guidance on how to achieve the same behavior?

Thanks in advance.

@omatrot I don't quite understand your problem, but it seems you want to hold that 500 items for 2.5 seconds before sending them. If so, you may want to check out the buffer operator

@freak4pc I have a situation where backpressure handling is necessary- I'm handling inserts/moves/deletes in a tableview (AsyncDisplayKit, so can't use rxcocoa bindings), and there is an animation sequence where I need to stop inserting/moving/deleting and wait until the animation finishes. After that, I want to process whatever inserts/moves/deletes were queued while the animation was happening. I think RxSwiftExt's pauseBuffered would solve the issue, but it'd be great if the main library had support for backpressure.

Hey! Did you ever find solution to this? I'm struggling with the same thing using RxASDataSources

Was this page helpful?
0 / 5 - 0 ratings