RxJava 2.2.6: how to buffer until timeout?

Created on 20 Feb 2019  路  10Comments  路  Source: ReactiveX/RxJava

I'd like a buffer operator that emits a List of items when either of:

  • a count is reached
  • or X milliseconds have elapsed since the most recent item was added to the buffer. (as opposed to emitting at fixed intervals, which is what the current buffer operators seem to do)

I've been looking at the different buffering operators for an observable but none of them seem to work well for my use case.

How do I achieve this?

Thanks!

2.x Question

Most helpful comment

I've had use for such an operator myself so I might have a look at it soonish for publishing in rxjava2-extras.

All 10 comments

We don't have an operator for it but Project Reactor does.

Is there a way to mimic the same behavior in RxJava? Using the buffer(boundary) overload for example?

The best is to write/port a custom operator as the count/timer cross reset is really complicated to achieve with composition. The functionality is rarely needed thus did not reach the threshold for becoming a standard RxJava operator.

The operator you suggested from Project Reactor is not what I need either. It emits items based on the time elapsed since the oldest entry in the buffer, not the most recent one.

I've had use for such an operator myself so I might have a look at it soonish for publishing in rxjava2-extras.

I've made this operator in https://github.com/davidmoten/rxjava2-extras/pull/17 as specified by @lalithsuresh using composition with a new operator insert that does that emit something when source hasn't emitted for a while bit. I'll stew on it a few days then request review from anyone willing.

@davidmoten When would this operator be available to use... I am not able to derive a solution to https://github.com/ReactiveX/RxJava/issues/6423 yet. This looks promising..

Review welcome of PR https://github.com/davidmoten/rxjava2-extras/pull/17, thanks. @DareUrDream I'll release next week depending on reviews.

@lalithsuresh, I had the exact same use case earlier. My original attempt was with Observable.amb():

protected <R> Observable<List<R>> groupOperations(@NonNull Observable<R> source) {
    return source.publish(stream -> stream.buffer(
        Observable.amb(
                stream.debounce(groupingTimeout, TimeUnit.MILLISECONDS),
                stream.skip(groupMaxSize - 1))
            .first() // Complete observable after the first reached trigger
            .repeatWhen(observable -> observable))); // Resubscribe immediately for the next buffer
}

This doesn't work, as it doesn't buffer the last emitted item from source if groupMaxSize equals total number of items emitted by source observable so far -- that last item is lost. I never fully understood why that happens, so I'd be interested if anyone can see what's wrong with that approach.

This is what I'm using currently for the purpose:

protected <R> Observable<List<R>> groupOperations(@NonNull Observable<R> source) {
    return source.publish(stream -> stream.buffer(
        Observable.merge(
                stream.window(groupMaxSize).skip(1),
                stream.debounce(groupingTimeout, TimeUnit.MILLISECONDS))))
        .filter(list -> !list.isEmpty());
}

This isn't a perfect solution either: when a triggered debounce is followed with a window closing, we'll get two partial buffers one after the other (instead of one larger buffer). Two semi-full buffers isn't an issue for my use case though, so I've kept this one. Currently in use here: reark/data/stores/cores/ContentProviderStoreCoreBase.java#L172.

Thanks @apoi . I'm currently using the solution that @davidmoten implemented in https://github.com/davidmoten/rxjava2-extras/pull/17. It works like a charm!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

nltran picture nltran  路  4Comments

archenroot picture archenroot  路  3Comments

aballano picture aballano  路  3Comments

yubaokang picture yubaokang  路  3Comments

francorolando picture francorolando  路  3Comments