ConflatedBroadcastChannel is a good fit for _dataflow programming_. It represents an asynchronously updatable value and its consuming operation "does the right thing" of delivering the most recent value and all the subsequent update. In the domain of dataflow programming the common operation is to process each value in a separate coroutine. For example, in Android:
val x = ConflatedBroadcastChannel() // a model for some value x
launch(UI) {
x.consumeEach { someView.text = it.toString }
}
This launch(context) { consumeEach { ... } } pattern is very common, so some dedicated terminal operator seems to be in order for it. Possible names are:
launchConsumeEach is an obvious, but verbose choice, see https://github.com/dmytrodanylyk/coroutines-arch/blob/master/library/src/main/java/com/kotlin/arch/ChannelsExt.kt by @dmytrodanylyk
The challenge that we have is that all terminal operators currently defined on ReceiveChannel are suspending functions that work in the context of invoker. All non-suspending operator that receive context as parameter perform some kind of transform and return another representation. This immediately suggests that launchConsumeEach become such an intermediate operator and its result should be a Job. But can we find a better name for it? Some verb in style of the other operators we have?
@elizarov Your snippet should be x.consumeEach { … } as there's no receiver.
Why not someChannel.launchConsumer(UI, parent = job) { … }?
@LouisCAD Thanks for correction on my snipped.
launchConsumer does not make it obvious that I'm operating on each element in the block. We've been discussing if we could use onEach as a name (short is on topic), but Sequence.onEach is an intermediate operator that also sends each element downstream, which is not what we want to achieve here.
someChannel.consumeEachAsync(UI, parent = job) { … }? I'm not 100% fan because async returns a Deferred, not a Job, but I find it natural to read otherwise.
I see analogies to PR #274, specifically for map, combine and launchObserver.
Is this issue related to that PR?
All concepts are indeed related.
Hi @elizarov
I consider two discussion strictly related, your problem is to create a "asynchronously updatable value" and this looks covered to PR #274, if that PR does not solve your problem maybe we need to find a valid use case for #274.
The exposed problem is to create an observer for a value (a particular kind of actor), @jcornaz and I discussed here: https://github.com/Kotlin/kotlinx.coroutines/pull/274#issuecomment-371487985 https://github.com/Kotlin/kotlinx.coroutines/pull/274#issuecomment-371497027
It is possible solve this issue creating an actor using a "channel" as parameter instead of "capacity".
How about drainWith to indicate a terminal operation?
fun <T> ReceiveChannel<T>.drainWith(block: suspend (T) -> Unit, ...) =
launch(...) {
consumeEach {
block(it)
}
}
@fvasco You are right. In the #274 you propose launchObserver operator. It is, indeed, more convenient that the solution we have now.
To recap. Now we have to write:
launch(UI) { // sets context for the last consumeEach -- very unintuitive!
source
.filter { ... }
.consumeEach { ... }
}
Your proposal:
.filter { ... }
.launchObserver(UI) {
consumeEach { ... }
}
It is definitely better. But now we see that launchObserver and consumeEach are going to be often used together. We can do even better and combine then into launchConsumeEach:
.filter { ... }
.launchConsumeEach(UI) { ... }
@elizarov
Some more ideas:
E.g.
channel.consumeEachWithCoroutine(CommonPool) { }
channel.consumeEachIndexedWithCoroutine(CommonPool) { }
channel.consumeEachWithContext(CommonPool) { }
channel.consumeEachIndexedWithContext(CommonPool) { }
channel.consumeEachByCoroutine(CommonPool) { }
channel.consumeEachIndexedByCoroutine(CommonPool) { }
channel.consumeEachWith(CommonPool) { }
channel.consumeEachIndexedWith(CommonPool) { }
channel.consumeEachBy(CommonPool) { }
channel.consumeEachIndexedBy(CommonPool) { }
Here is another idea, how about introducing CoroutineChannelConsumer class (maybe with better name)? 🤔
This class will spawn coroutine for consumeEach and consumeEachIndexed.
val consumer = CoroutineChannelConsumer(channel, CommonPool)
// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEach { ... } // non-suspend fun
// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEachIndexed { ... } // non-suspend fun
Pros
Cons
CoroutineChannelConsumer objectWhat about channel.doOnEach { … }?
It is similar to Android KTX naming for extensions like doOnLayout { … }.
However, it does not includes the word consume. I don't know if documenting the fact that it consumes the channel would be enough.
Here's what I'm using at the moment:
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.coroutineContext
fun <E> ReceiveChannel<E>.doOnEach(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
action: suspend (E) -> Unit
) = launch(context, parent = parent) { consumeEach { action(it) } }
suspend fun <E> ReceiveChannel<E>.doOnEach(action: suspend (E) -> Unit) = launch(coroutineContext) {
consumeEach { action(it) }
}
I just had two new name ideas: channel.invokeOnEach { .. } and channel.invokeForEach { .. }.
Upside: Same naming as invokeOnCompletion.
Downside: Not so clear that it consumes the channel.
Here's my new version with structured concurrency support (non suspend version has been removed):
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.coroutineContext
suspend inline fun <E> ReceiveChannel<E>.doOnEach(
context: CoroutineContext = EmptyCoroutineContext,
noinline action: suspend (E) -> Unit
) = CoroutineScope(coroutineContext).launch(context) { consumeEach { action(it) } }
We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a channel it would look like this:
channel.asFlow().launchIn(scope) {
onEach { it -> action(it) }
catch<SomeException> { it -> handleException(it) }
finally { releaseSomeResources() }
}
The other way to use this API would be like this:
scope.launchFlow(channel.asFlow()) {
// same DSL here
}
channel.asFlow().launchIn is a way to go, fixed in 1.3.0-M1
A correction: The released replacement for the proposed channel.launchConsumeEach(scope) { block } is channel.asFlow().onEach { block }.launchIn(scope)
Most helpful comment
We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a
channelit would look like this:The other way to use this API would be like this: