It would be useful to have an optional transformation in Flow.buffer method to aggregate the buffered items like in kotlin.sequences.Sequence.chunked.
I mean,
fun buffer(capacity: Int = BUFFERED, suspend transform: (List<T>) -> R) : Flow<T>
Then we can write
runBlocking {
(1..100).asFlow().buffer(capacity = 10) { it.sum() }.collect { println(it) }
}
with result 55, 155, 255, ... , 955
That would be a separate operator that we'll call chunked or something like that (as we generally follow stdlib naming convention). This operator will be totally unrelated to the buffer operator. Unlike buffer, this operator will be fully sequential.
P.S. Rx has a whole set of bufferXxx operators that actually correspond to chunked/windowed in Kotlin. On the other hand, buffer/conflate operators in Kotlin flows somewhat correspond to Rx onBackpressureBuffer operators.
Also, I've forgot to ask what be your use-case for such an operator?
Thank you very much for the response!
About the use case: I need to handle stream of vectors (csv row) from a tcp socket and write aggregated statistics for every chunk of n messages to files.
Are you sure you need Flow for this? Wouldn't a Sequence from Kotlin standard library work for you?
Well, I'm using Ktor as a server for socket connection and using coroutines to write data to a file. Pseudo code for my task:
launch {
val socket = server.accept()
val input = socket.openReadChannel()
flow {
while (true) {
val line = input.readUTF8Line()
emit(line)
}
}.map {
convertToDomainObject(it)
}. chunked(1000) {
aggregateToDomainObjects(it)
}.collect {
writeToFile(it)
}
}
Yes, I can use just sync Sequence. But then I need to provide a back pressure mechanism manually: what if I receive messages faster than I store them? Hopefully, chunked() method will be the same as buffer() and provide a way for back pressure out of the box.
@azulkarnyaev Thanks for explanation. It does make sense.
I would second this issue - I have a use case, where I'm receiving subsequent snapshots of a database and I need to produce classes representing diffs between those snapshots.
So I need to cache two subsequent emissions, emit them as Pair(first, second), remove the first emission, and wait for the third emission to emit another Pair(second, third). With collections, I do get it with windowed function.
I do not control frequency of emissions and they must happen on a background thread -> hence Flow is needed.
I think it could be generalized into something like this:
fun <T> Flow<T>.windowed(size: Int, step: Int): Flow<List<T>> = flow {
// check that size and step are > 0
val queue = ArrayDeque<T>(size)
val toSkip = max(step - size, 0) < if sbd would like to skip some elements before getting another window, by serving step greater than size, then why not?
val toRemove = min(step, size)
var skipped = 0
collect { element ->
if(queue.size < size && skipped == toSkip) {
queue.add(element)
}
else if (queue.size < size && skipped < toSkip) {
skipped++
}
if(queue.size == size) {
emit(queue.toList())
repeat(toRemove) { queue.remove() }
skipped = 0
}
}
}
Intended use:
flow.windowed(size = 2, step = 1)
.map { listOfTwoNeighboringEmissions ->
computeDiff()
}
@circusmagnus That sounds like a use case for scan more than windowing.
Almost. Scan requires me to either provide an initial value, which I do not have (an empty diff is stupid, I just need to swallow first dB emission and wait for the second one to produce a diff) or to emit the same type, as I am receiving (scanReduce), which is also a no go, as I get a list of entities, but want to emit changes between them.
Would this work?
flow
.scan(listOf<Item>()) { oldItems, newItem ->
if (oldItems.size >= BUFFER_COUNT) listOf(newItem)
else oldItems + newItem
}
.filter { it.size == BUFFER_COUNT}
On the first look it should work. I would recommend however to use more efficient and streamlined operator outlined in this PR: https://github.com/Kotlin/kotlinx.coroutines/pull/1558
flow.chunked(2) { twoEmissions -> combine(twoEmissions) }
It is not going into coroutines lib, as it does not deal with time-based chunking / windowing. But for now it is best solution for size-based chunking.
Hi all, I'm curious if there's been any work on this. I reach for something like this about once every two weeks, and keep coming back to this thread.
I have proposed a design for unified time- and size-based chunking in #1302 . You are welcome to comment or just give thumbs up (or down).
No idea, what is the plan of coroutines team, regarding this issue, though.
I'm realizing that I actually want a slightly different behavior from what I've seen discussed thus far, because really all I want is to be able to convert a stream of values to a batch operation when appropriate.
something like the following:
/**
* [chunked] buffers a maximum of [maxSize] elements, preferring to emit early rather than wait if less than
* [maxSize]
*
* If [checkIntervalMillis] is specified, chunkedNaturally suspends [checkIntervalMillis] to allow the buffer to fill.
*
* TODO: move to kotlin common
*/
fun <T> Flow<T>.chunked(maxSize: Int, checkIntervalMillis: Long = 0): Flow<List<T>>
This is optimizing for a database that performs better with batch operations than a number of small ones, and for which it's safest to restrict writes to once a second.
My implementation is here. I'm sure I'm using coroutines incorrectly somehow:
https://gist.github.com/AWinterman/8516d4869f491176ebb270dafbb23199
Seems that your chunked operator will suspend after filling up buffer (max size reached) but it will not emit until checkIntervalMillis is reached. checkIntervalMillis is a must-have condition for it to emit.
Is it intentional?
@circusmagnus I'm not sure I follow.
delay(checkIntervalMillis) ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit. I don't know what you mean by "checkIntervalMillis is a must-have condition for it to emit."
Ah, i just realized that the delay interval can be accomplished downstream
.transform {
emit(it)
delay(100)
}
which makes my usecase wholy subsumed by Flow.chunked operator with size limit.
@circusmagnus I'm not sure I follow.
- suspending after filling up the buffer is intentional. If the buffer is full, we need to exert backpressure on the upstream flow.
- the
delay(checkIntervalMillis)ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit.I don't know what you mean by "
checkIntervalMillisis a must-have condition for it to emit."
checkIntervalMillis is reached. Perhaps downstream is idle and it can accept a new chunk before checkIntervalMillis comes. In your impl downstream cannot emit more often, than checkIntervalMillis specifies. There is non-circumnavigable delay() there.while (!buffer.isClosedForReceive) {
val chunk = getChunk(buffer, maxSize)
[email protected](chunk)
delay(checkIntervalMillis) <- we cannot emit more often, than that
}
checkIntervalMillis is a must have condition to emit, but maxSize is not - we can emit before reaching max size, but we cannot emit more often, than checkIntervalMillis says. Was it intentional? Do You need to limit frequency of emissions in your use-case?@circusmagnus I'm not sure I follow.
- suspending after filling up the buffer is intentional. If the buffer is full, we need to exert backpressure on the upstream flow.
- the
delay(checkIntervalMillis)ensures that we do not busy wait, and that the buffer has a chance to fill up before we collect a chunk and emit.I don't know what you mean by "
checkIntervalMillisis a must-have condition for it to emit."
- If the buffer is full, we could try to emit, rather than suspend upstream until
checkIntervalMillisis reached. Perhaps downstream is idle and it can accept a new chunk beforecheckIntervalMilliscomes. In your impl downstream cannot emit more often, thancheckIntervalMillisspecifies. There is non-circumnavigabledelay()there.while (!buffer.isClosedForReceive) { val chunk = getChunk(buffer, maxSize) [email protected](chunk) delay(checkIntervalMillis) <- we cannot emit more often, than that }
- Sure.
checkIntervalMillisis a must have condition to emit, butmaxSizeis not - we can emit before reaching max size, but we cannot emit more often, than checkIntervalMillis says. Was it intentional? Do You need to limit frequency of emissions in your use-case?
ah, yes, it was intentionally. Emit no more frequently than X, but as I stated above, because flows are composeable, this can be accomplished with a downstream flow operation, so my use case is entirely satisfied by a "Flow.chunked operator with size limit"
Most helpful comment
I would second this issue - I have a use case, where I'm receiving subsequent snapshots of a database and I need to produce classes representing diffs between those snapshots.
So I need to cache two subsequent emissions, emit them as Pair(first, second), remove the first emission, and wait for the third emission to emit another Pair(second, third). With collections, I do get it with windowed function.
I do not control frequency of emissions and they must happen on a background thread -> hence Flow is needed.
I think it could be generalized into something like this:
Intended use: