Kotlinx.coroutines: Extra event still handled even `.buffer(0, BufferOverflow.DROP_LATEST)`

Created on 16 Nov 2020  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

Library Version: 1.4.0-M1

I am trying to do:

 2sec     2sec     2sec
------[A]------[B]------[C]------...----------------> InitailFlow
       \        |        | 
        \      drop      drop
         \
     5sec \    5sec        5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow
result: [A1, A2, A3]

So I have InitailFlow which emits a short amount of time (2 seconds) which is then transformed to AnotherFlow which takes longer to finish (15 seconds in total)... I would like to drop the other incoming items emitted by the InitialFlow while AnotherFlow isn't finished...

I've tried doing this:

flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
    .onEach {
       println("Event for $it")
    }
    .flatMapConcat {
       flow {
           delay(5000)
           emit("${it}1")
           delay(5000)
           emit("${it}2")
           delay(5000)
           emit("${it}3")
        }
     }
     .onEach {
         println(it)
     }
     .launchIn(scope)

But for some reason this is the result:

Event for A
A1
A2
A3
Event for B
B1
B2
B3

It still process Event B for some reason even when I have a
.buffer(0, BufferOverflow.DROP_LATEST).

Was wondering if this is an intended behavior?

question

Most helpful comment

That's because RENDEZVOUS capacity is not a special mode, but just a shorthand for bufferSize = 0. We wanted to make sure it fuzes nicely with any buffer overflow strategy, so that a call to flow.buffer(onBufferOverflow=DROP_LATEST) always works in a reasonable way regardless of implementation details of the upstream flow, even if the upstream flow happens to be confugured with .buffer(0).

All 5 comments

Documentation on buffer says:

To implement either of the custom strategies, a buffer of at least one element is used.

You're configuring a capacity of 0, meaning the buffer is RENDEZVOUS: the emitter suspends and waits for the collector if buffer is full, and the collector suspends and waits for emission if buffer is empty. However, you are also using a custom strategy, and documentation on DROP_OLDEST and DROP_LATEST says that a buffer of at least one element is used, and that the emitter never suspends. To allow the emitter to never suspend trying to emit when the buffer is full, the buffer either drops the latest value, keeping the buffer intact, or it makes room: it keeps the latest value by dropping the oldest value.

This leads me to believe your buffer capacity is being overridden and is actually one. Custom strategies do not make sense with a buffer of capacity 0. I have ran your code with capacity=1 and behavior is the same as capacity=0.

But why does it still print A and B? A passes through the buffer immediately to the next onEach, which runs on a different coroutine. Before the subsequent operations complete, B is emitted and stored in the buffer, and C is emitted, but then, the buffer with capacity 1 is full with B, so C is dropped.

Looks to me it is working as designed, but documentation should be clearer about incompatibility of setting a buffer size of 0 and a custom strategy. Also, lint warnings perhaps.

I think the documentation has its covered. See https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html

onBufferOverflow - configures an action on buffer overflow (optional, defaults to SUSPEND, supported only when capacity >= 0 or capacity == Channel.BUFFERED, implicitly creates a channel with at least one buffered element).

Does it help?

I think the documentation has its covered. See https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html

onBufferOverflow - configures an action on buffer overflow (optional, defaults to SUSPEND, supported only when capacity >= 0 or capacity == Channel.BUFFERED, implicitly creates a channel with at least one buffered element).

Does it help?

Not OP, but I wonder why a check such as the following is not done for capacity=RENDEZVOUS and onBufferOverflow=DROP_LATEST or DROP_OLDEST:


require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
        "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
}

That's because RENDEZVOUS capacity is not a special mode, but just a shorthand for bufferSize = 0. We wanted to make sure it fuzes nicely with any buffer overflow strategy, so that a call to flow.buffer(onBufferOverflow=DROP_LATEST) always works in a reasonable way regardless of implementation details of the upstream flow, even if the upstream flow happens to be confugured with .buffer(0).

Alright. Thank you very much.. I'll be closing this issue now.

Was this page helpful?
0 / 5 - 0 ratings