Kotlinx.coroutines: zipWithNext for Flow

Created on 19 Jan 2020  Â·  8Comments  Â·  Source: Kotlin/kotlinx.coroutines

Can we have a zipWithNext operator for Flow? Could you help me implement one?

flow

Most helpful comment

Implementation is pretty straight-forward, but what's your use-case? Why do you need it?

fun <T, R> Flow<T>.zipWithNext(transform: (T, T) -> R): Flow<R> = flow {
    var prev: Any? = UNDEFINED
    collect { value ->
        if (prev !== UNDEFINED) emit(transform(prev as T, value))
        prev = value
    }
}

private object UNDEFINED

All 8 comments

Implementation is pretty straight-forward, but what's your use-case? Why do you need it?

fun <T, R> Flow<T>.zipWithNext(transform: (T, T) -> R): Flow<R> = flow {
    var prev: Any? = UNDEFINED
    collect { value ->
        if (prev !== UNDEFINED) emit(transform(prev as T, value))
        prev = value
    }
}

private object UNDEFINED

Thanks for helping out @elizarov

Well is not something that can't be done differently. I'm new to Coroutines but I am building a bluetooth library on Android using Coroutines/Flow.

As such during the readout response, which btw happens byte-by-byte via Flow, I need to check whether a specific pair of Hex bytes (02 FF) is included in the response when eventually collecting those bytes.

But I came up with an alternative using buffers and accumulating the whole Bluetooth Response into a ByteArray before it's collected by Flow. So in that case collect gives me a ByteArray instead of Byte so I can easily search for those two pair of values

Good that you've found a better way. zipWithNext is a bad way to do it and using flow for byte-by-byte transfer is not ideal either.

I would've found this useful for diffing upstream values into downstream change events.

datasets // DataSet1… DataSet2… DataSet3… DataSet… DataSet… 
   .onStart { emit(DataSet.empty) } // DataSet.empty… DataSet1… DataSet2… DataSet3… DataSet… DataSet… 
   .zipWithNext { a, b -> b.changesSince(a) } // [Change1a, Change1b, Change1c]… [Change2a, Change2b]…
   .flatMapConcat { changes -> changes.asFlow() } // Change1a… Change1b… Change1c… Change2a… Change2b… 

@fluidsonic Thanks for a use-case.

@elizarov Should it be added to core? I currently need to merge the values of each emission, or is there a better way?

@RinOrz You can use this implementation for now -> https://github.com/Kotlin/kotlinx.coroutines/issues/1767#issuecomment-577158308

@RinOrz You can use this implementation for now -> #1767 (comment)

Yes, I am currently doing this, I mean it might be added to the coroutine core, maybe it is useful

Was this page helpful?
0 / 5 - 0 ratings