Rxjava: 2.x: Observable.interval() - why doesn't it have backpressure (Flowable) now (after 1.x)?

Created on 14 Oct 2016  路  6Comments  路  Source: ReactiveX/RxJava

I used to use Observable.interval() with Observable.zip() and deal with backpressure. Now then backpressure is implemented via Flowable, Observable.interval() still returns Observable, not Flowable.
How can it be Observable when it is a hot source?

Question

Most helpful comment

2.x Observable.zip uses an unbounded buffer instead of a bounded one. There is no additional buffer like with 1.x zip+onBackpressureBuffer.

All 6 comments

You mixed up a few concepts here. In 2.x the backpressure-enabled type is Flowable. Observable doesn't have any backpressure. Why would Observable.interval() return Flowable? In addition, interval is cold because it only starts to tick when a Subscriber/Observer subscribes to it.

@akarnokd, oww well, haven't expected Flowable.interval() to exist.
Yeah, you are right, its cold. What I meant is interval() will not wait Subscriber to be done with its things and emit next value if intervalMs is smaller than time needed for Subscriber.onNext().
So, in case of long doing onNext(), what will Observable.interval() do with Observable.zip() after that if it has no backpressure? Will it drop all except last value or...?

There is no drop there but it will fill in the internal buffer of zip(). If zip is delayed too much, you'll get an OutOfMemoryError.

@akarnokd, have I understood it right that 2.x Observable.zip(source) now works like 1.x Observable.zip(source.onBackpressureBuffer()), maintaining all unobserved data in itself?

2.x Observable.zip uses an unbounded buffer instead of a bounded one. There is no additional buffer like with 1.x zip+onBackpressureBuffer.

I got it. Thanks for so fast reply :+1:

Was this page helpful?
0 / 5 - 0 ratings