I have been implementing a natural buffering operator as discussed in https://github.com/Kotlin/kotlinx.coroutines/issues/902 and just discovered an edge case issue.
The code we have is very similar to what I posted in the other issue:
@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = channelFlow<List<T>> {
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
buffer(1)
val bufferChunks = ArrayList<T>(maxChunkSize)
collect {
bufferChunks += it
if (bufferChunks.size < maxChunkSize) {
val offered = offer(bufferChunks.toList())
if (offered) {
bufferChunks.clear()
}
} else {
send(bufferChunks.toList())
bufferChunks.clear()
}
}
if (bufferChunks.size > 0) send(buffer.toList())
}.buffer(0)
This has been working very well for us, but the issue I now uncovered is: Given a buffer that is not full, if I offer the buffer but the downstream consumer is busy the code here will then wait until it is able to collect an additional element before offering again to the downstream consumer, even though it may take a long time until I get the additional element and the downstream consumer might be free in the meantime. So is there a way to check if there is a queued item so I could offer the incomplete buffer until I can collect a new item? So ideally my code could now look something like that:
...
var offered = false
while (hasNoItemsQueued && offered == false) {
var offered = offer(bufferChunks.toList())
}
...
where hasNoItemsQueued would be a variable in the scope provided by the collect method. This is btw just for illustration purposes and in the end I do not care how this would be implemented.
I have been able to get around this in a quite hacky/complicated way (just for a prototype), using an onEach that runs before the collect in a different context and sets a shared variable. But that does feel quite ugly and I am not even sure if that approach is free of issues.
I did actually find a way around this by changing the code to:
@ExperimentalCoroutinesApi
suspend fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = channelFlow<List<T>> {
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
buffer(1)
val bufferChunks = ArrayList<T>(maxChunkSize)
produceIn(this).consume {
while (!isClosedForReceive) {
val it = receive()
bufferChunks += it
if (bufferChunks.size < maxChunkSize) {
do {
val offered = offer(bufferChunks.toList())
if (offered) {
bufferChunks.clear()
}
delay(1)
} while (isEmpty && bufferChunks.size > 0)
} else {
send(bufferChunks.toList())
bufferChunks.clear()
}
}
}
if (bufferChunks.size > 0) send(buffer.toList())
}.buffer(0)
not sure how ideal this is as now we change the flow to a channel, or if that has some other implications for the rest of the application, but it does work in my initial tests. Would be great however to hear if that is the best way of if there is some way to achieve this with pure flow.
I just tested this implementation with the produceIn and our average batch size has greatly reduced and performance has gotten much worse. I am not sure if the issue is because now we convert everything to channels or if that is just inherent about the added loop. I wonder if there was some other way arround it, like if poll had a time parameter, so it would try receiving a new item for 500ms and if it then did not receive it we could offer the same buffer again. Or similar if flow had some method 'do something if we did not receive a new element for x ms'
But no matter what, it would be really great if this issue could be solved some way.
Few notes about your code:
bufferedChunks shouldn't be a suspend function. All it does is call callbackFlow and buffer(), neither of which are suspending calls. Functions that don't make suspending calls shouldn't be marked as suspending, because they force the compiler to unnecessarily generate state machines for the caller when the caller doesn't make any other suspending calls. It's also confusing for people reading the code, because a function that returns a Flow generally shouldn't need to suspend at all.buffer(1) call is a no-op. Flow operators return new flows. Your buffer(1) call isn't doing anything with the return value, so it's a noop. For it to be applied to the current flow, you'd need to do buffer(1).produceIn(this).sleep in your polling loop. Luckily, the channel API is designed so you don't need do this kind of polling.I think this code is roughly equivalent to what you wrote:
@ExperimentalCoroutinesApi
fun <T> Flow<T : Any>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
val bufferChunks = ArrayList<T>(maxChunkSize)
val upstreamChannel = buffer(1).produceIn(this)
val downstreamChannel = this
while (!upstreamChannel.isClosedForReceive) {
// Buffer is full, don't process any more upstream emissions until the
// buffer has been emitted.
if (bufferChunks.size >= maxChunkSize) {
downstreamChannel.send(bufferChunks.toList())
bufferChunks.clear()
}
// Wait for new upstream emissions, but also try to send any buffered
// items.
select {
upstreamChannel.onReceiveOrNull {
// Null means the upstream completed while we were suspended, and
// the loop will terminate after this. Note that if you need T to
// be nullable, you'll need to wrap your upstream values in some
// sort of additional value to distinguish between actual null
// values and the close sentinel. Hopefully there will eventually
// be an onReceiveOrClosed method that makes this simpler.
if (it != null) {
bufferChunks += it
}
}
if (bufferChunks.isNotEmpty()) {
downstreamChannel.onSend(bufferChunks.toList()) {
bufferChunks.clear()
}
}
}
}
// After upstream completes, flush any remaining items.
if (bufferChunks.isNotEmpty()) send(buffer.toList())
}.buffer(0)
That said, I think you could also write this with only one channel:
@ExperimentalCoroutinesApi
fun <T> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> = callbackFlow<List<T>> {
coroutineScope{
require(maxChunkSize >= 1) {
"Max chunk size should be greater than 0 but was $maxChunkSize"
}
val bufferChunks = ArrayList<T>(maxChunkSize)
val downstreamChannel = this@callbackFlow
var sendJob: Job? = null
collect { item ->
// Cancel the send job in case the downstream is slow.
// Need to join on the job to synchronize access to the buffer.
sendJob?.cancelAndJoin()
bufferChunks += item
// Cache the full status of the buffer, since it's not safe to access
// the buffer after launching the below coroutine until the coroutine has
// completed.
val isBufferFull = bufferChunks.size >= maxChunkSize
// Launch a coroutine to send, so we can still accept more upstream emissions.
sendJob = launch {
// Potentially executing on different thread, but not racy since
// the main coroutine will not touch the buffer until this job has
// completed.
downstreamChannel.send(bufferChunks.toList())
// Send has atomic cancellation - if the send succeeds, it will
// not throw a CancellationException even if the job was cancelled.
bufferChunks.clear()
}
if (isBufferFull) {
// Don't process any more upstream emissions until the
// buffer has been emitted.
sendJob!!.join()
}
}
}
}.buffer(0)
@zach-klippenstein thanks for the comments/improvements and possible solutions. I started to think of a similar solution to number 2 now as well so that is great to see that I was on the right track there. I wonder if this use-case could be made simpler than this solution, but otherwise we do have an answer now and if the API is not likely to change this issue could also be closed.
@zach-klippenstein It is unsettling to see a solution that relies on atomic cancellation in Channel.send give that we are thinking on abolishing this behavior in the next major release. See #1813. It makes me wonder how much other code "in the wild" does a similar exploit.
Not a huge fan of that myself, even without the future plans. It's a little too "magic". Atomic cancellation is load bearing but very subtle and not obvious unless you read all the docs.
@elizarov I am now making use of a solution very similiar to the latest solution suggested by @zach-klippenstein, i.e. the one with the sendJob. Is that something that will be affected by #1813? And if so, is there another proposed solution for this issue?
You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.channels.*
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
fun <T : Any> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> {
require(maxChunkSize >= 1)
return flow<List<T>> {
coroutineScope {
val upstreamChannel = [email protected](maxChunkSize).produceIn(this)
while (true) { // loop until closed
val bufferChunks = ArrayList<T>(maxChunkSize) // allocate new array list every time
// receive the first element (suspend until it is there)
// null here means the channel was closed -> terminate the outer loop
val first = upstreamChannel.receiveOrNull() ?: break
bufferChunks.add(first)
while (bufferChunks.size < maxChunkSize) {
// poll subsequent elements from the channel's buffer without waiting while they are present
// null here means there are no more element or channel was closed -> break from this loop
val element = upstreamChannel.poll() ?: break
bufferChunks.add(element)
}
emit(bufferChunks)
}
}
}
}
@Globegitter Did you end up using a solution from @elizarov? How is your experience? What about performance?
@pacher Not yet. My work is project-based and the bug I described has not been high priority enough so I have currently been assigned to a different project. I do expect to come back to it in the future however.
You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching:
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.channels.* @OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class) fun <T : Any> Flow<T>.bufferedChunks(maxChunkSize: Int): Flow<List<T>> { require(maxChunkSize >= 1) return flow<List<T>> { coroutineScope { val upstreamChannel = [email protected](maxChunkSize).produceIn(this) while (true) { // loop until closed val bufferChunks = ArrayList<T>(maxChunkSize) // allocate new array list every time // receive the first element (suspend until it is there) // null here means the channel was closed -> terminate the outer loop val first = upstreamChannel.receiveOrNull() ?: break bufferChunks.add(first) while (bufferChunks.size < maxChunkSize) { // poll subsequent elements from the channel's buffer without waiting while they are present // null here means there are no more element or channel was closed -> break from this loop val element = upstreamChannel.poll() ?: break bufferChunks.add(element) } emit(bufferChunks) } } } }
I think this method does not do buffering, I was trying it out in a project and the elements in the flow would not get buffered, not sure if someone else has seen that. If I get a chance I will try to elaborate more but at least that was my experience copying that method verbatim
Most helpful comment
You don't need two channels nor the select expression for this case. Performance will be much higher. You can use the following approach for natural batching: