Kotlinx.coroutines: Flow: withLatestFrom operator

Created on 3 Sep 2019  路  18Comments  路  Source: Kotlin/kotlinx.coroutines

See #1315 for one of the use-cases. Note, that is operator can be implemented in a quite a straightforward way using stable Flow APIs. The simple implementation is given below:

fun <A, B: Any, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> = flow {
    coroutineScope {
        val latestB = AtomicReference<B?>()
        val outerScope = this
        launch {
            try {
                other.collect { latestB.set(it) }
            } catch(e: CancellationException) {
                outerScope.cancel(e) // cancel outer scope on cancellation exception, too
            }
        }
        collect { a: A ->
            latestB.get()?.let { b -> emit(transform(a, b)) }
        }
    }
}

TODO: We need to figure out a name for this operation that is consistent with combine and xxxLatest naming convention. Even better, it should be a part of the customizable "combine family" of operations where each source flow can either react to the the incoming values or just reference the most recent value. It is also related to #1354 and #1082 which would introduce the concept of StateFlow (a flow with the most recent value).

flow

Most helpful comment

Seeing as #1315 seems to have stalled since 2019. Seeing as StateFlow has been released, is there anything blocking the implementation of withLatestFrom? similarly to @uburoiubu I too find this operator very useful for reactive UI programming and would gladly help on its implementation if needed.

All 18 comments

@elizarov Is this the draft for the real implementation, or is PR #1315 going to be the real implementation and this is just equivalent for easier reading? Afaiu, there's a requirement in #1315 that withLatestFrom should continue emitting latestB in case B completes before A does, the implementation above seems to cancel the whole thing when B gets cancelled.

Also, a question for my own education, if it's not too much trouble:

  • what's the reason behind catching the cancellation exception and explicitly passing it to .cancel() of the outerScope -- wouldn't it work exactly the same way without the try/catch block because of structured concurrency? (with job introduced by launch being child to job of the coroutineScope block and hence propagating both exceptions and cancellation) Sorry if I missed something, I'm fairly new to kotlinx.coroutines

@ivanbartsov This implementation given above completes only when this (a) flow completes. Completion of the other (b) flow does not terminate it.

The reason for try/catch is the CancellationException has a special role in the structured concurrency. coroutineScope { } does not complete when one of its children crash with CancellationException, because CancellationException is used as a means to explicitly cancel specific children without touching others.

@elizarov

coroutineScope { } does not complete when one of its children crash with CancellationException

Right, that slipped my mind. Everything falls into place, thanks :)

I think there's a slight difference in behavior with this implementation vs the one in #1315, which is that the latter will still start collecting both flows eagerly but it will apply backpressure to this until other emits its first item. This implementation will drop items from this if other doesn't emit right away.

correct me if i'm wrong. This implementation would not support null in the second flow right? Because the default for the atomicReference is null

I had need for this operator, so i just used your code, but then realised null not allowed for the second flow. I came up with this fix:

private object UNINITIALIZED

fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> =
    flow {
        coroutineScope {
            val latestB = AtomicReference<Any>(UNINITIALIZED)
            val outerScope = this

            launch {
                try {
                    other.collect { latestB.set(it) }
                } catch (e: CancellationException) {
                    outerScope.cancel(e) // cancel outer scope on cancellation exception, too
                }
            }

            collect { a: A ->
                latestB.get().let {
                    if (it != UNINITIALIZED) {
                        emit(transform(a, it as B))
                    }
                }
            }
        }
    }

I think it will work and support for both flows to be null. Am i right?

Is there a multi-platform version of this? It would be really useful.

All Flow operators are multi-platform by default.

@BugsBunnyBR ...that moment when your dreams come true. 馃憤

I meant to ask for a version of this operator that is not bound to JVM.
As far as I could see, all proposals in this thread uses AtomicReference and it is a JVM class.

I find this operator extremely useful for UI reactive programming.

I have a document, which is simply a list of blocks representing different text fields in which user can type something. This document's state is composed from different data streams, including meta-data about this document along with ui-events like a specific block focusing. Only when document's structure is changed (i.e. new blocks added or deleted, etc.), it should be re-rendered.

Given these steams:

val document : Flow<List<Block>>
val focus: Flow<Id>
val metadata: Flow<Metadata>
document.withLatestFrom(
            focus, 
            metadata
        ) { doc, focus, metadata ->
            doc.mapToView(focus, metadata)
        }.collect { render(it) }

I don't need render(it) to be called every time focus or metadata changed (that's why current implementations of combineLatest() or even combine() do not work for this use case). Metadata and focus live on their own and can be changed internally without affecting what is being rendered. It is only when it is time to (re)render the document that we need these latests values from focus and metadata.

It is only when it is time to (re)render the document that we need these latests values from focus and metadata.

@uburoiubu Can you, please, explain it in a little bit more detail. I really don't get how it is a case when doc.mapToView needs both focus and metadata parameters (meaning it somehow uses them, so its result should surely depend on them), but you don't need to render when they change? If you don't need to render on their change why would you even need them then?

@BugsBunnyBR In order to make this KMP, you can implement the AtomicFU library (https://github.com/Kotlin/kotlinx.atomicfu) or just use the ConflatedBroadcastChannel instead of the AtomicReference. The ConflatedBroadcastChannel uses AtomicFU in its internals.

For example:

private object Unitialized

fun <A, B, R> Flow<A>.withLatestFrom(other: Flow<B>, transform: suspend (A, B) -> R): Flow<R> =
    flow {
        coroutineScope {
            val outerScope = this
            val channel = ConflatedBroadcastChannel<Any?>(Unitialized)

            launch {
                try {
                    other.collect(channel::send)
                } catch (e: CancellationException) {
                    outerScope.cancel(e)
                }
            }

            collect { a ->
                channel.asFlow().first().let { b ->
                    if (b != Unitialized)
                        emit(transform(a, b as B))
                }
            }
        }
    }

Seeing as #1315 seems to have stalled since 2019. Seeing as StateFlow has been released, is there anything blocking the implementation of withLatestFrom? similarly to @uburoiubu I too find this operator very useful for reactive UI programming and would gladly help on its implementation if needed.

@Progdrasil It would be extremely helpful if you drop a link here with some examples of your code that uses withLatestFrom (the more code -- the better).

if you drop a link here with some examples of your code

Unfortunately the code that uses withLatestFrom is closed source for now. But I can put a few examples inline here. Basically we use it on Android when dealing with forms where we want the data from the form on specific events, for example a button tap.
These examples are using FlowBinding.

val editTextList: List<TextInputEditText>
val textFlows: List<Flow<String>> = editTextList.map { it.textChanges() }

val submit: Button
submit
    .clicks()
    .withLatestFrom(textFlows) { _, data -> validate(data) }
    .launchIn(lifecycleScope)

Note its simplified for the actual validation and how its shown back to the user.

Else we also have the following:

val editTextList: List<TextInputEditText>
val validatedFlow: Flow<SomeDataClass> = editTextList
    .map { it.textChanges() }
    .combine { data -> parse(data) }

val submit: Button
submit
    .clicks()
    .withLatestFrom(validatedFlow) { _, data -> applyBusinessLogic(data) }
    .launchIn(lifecycleScope)

@Progdrasil Thanks a lot! What's the value of representing text flows and validatedFlow in this code as a Flow type? It does not seem that this code uses the reactive nature of those types in any way but simply takes their current value using withLatestFrom operator.

May I suggest you to consider an alternative approach to structure your code that seems simpler to me. Instead of val textFlows: List<Flow<String>> you can have:

val texts: List<String> 
    get() = editTextList.map { it.text } // retrieve current text when called

It is not using Flow here, since you only use the current text anyway. Then you can write:

    .clicks()
    .map { applyBusinessLogic(texts) }
    .launchIn(lifecycleScope)

You can do a similar simplification for the other snippet. Relace val validatedFlow: Flow<SomeDataClass> with:

```
val validated: SomeDataClass
get() = parse(textData) // just parse the current text data when called, no flow

Then you will be able to write simpler code there, too:

submit
.clicks()
.map { applyBusinessLogic(validated) }
.launchIn(lifecycleScope)
```

Does it help?

Without getting into too much detail, The reason we have a list of flows is actually because its the end result of what we end up with after creating a bunch of Input fields in a recycler view. So what we end up with is actually a Map<FieldIDOrPosition, MutableSharedFlow<Flow<String>>> Which ends up as a list of flows before the withLatestFrom part.

Was this page helpful?
0 / 5 - 0 ratings