Kotlinx.coroutines: launchConsumeEach terminal operator (name needed)

Created on 13 Apr 2018  Â·  18Comments  Â·  Source: Kotlin/kotlinx.coroutines

ConflatedBroadcastChannel is a good fit for _dataflow programming_. It represents an asynchronously updatable value and its consuming operation "does the right thing" of delivering the most recent value and all the subsequent update. In the domain of dataflow programming the common operation is to process each value in a separate coroutine. For example, in Android:

val x = ConflatedBroadcastChannel() // a model for some value x

launch(UI) { 
    x.consumeEach { someView.text = it.toString } 
}

This launch(context) { consumeEach { ... } } pattern is very common, so some dedicated terminal operator seems to be in order for it. Possible names are:

launchConsumeEach is an obvious, but verbose choice, see https://github.com/dmytrodanylyk/coroutines-arch/blob/master/library/src/main/java/com/kotlin/arch/ChannelsExt.kt by @dmytrodanylyk

The challenge that we have is that all terminal operators currently defined on ReceiveChannel are suspending functions that work in the context of invoker. All non-suspending operator that receive context as parameter perform some kind of transform and return another representation. This immediately suggests that launchConsumeEach become such an intermediate operator and its result should be a Job. But can we find a better name for it? Some verb in style of the other operators we have?

design enhancement

Most helpful comment

We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a channel it would look like this:

channel.asFlow().launchIn(scope) { 
    onEach { it -> action(it) }
    catch<SomeException> { it -> handleException(it) }
    finally { releaseSomeResources() }
}

The other way to use this API would be like this:

scope.launchFlow(channel.asFlow()) { 
    // same DSL here
}

All 18 comments

@elizarov Your snippet should be x.consumeEach { … } as there's no receiver.

Why not someChannel.launchConsumer(UI, parent = job) { … }?

@LouisCAD Thanks for correction on my snipped.

launchConsumer does not make it obvious that I'm operating on each element in the block. We've been discussing if we could use onEach as a name (short is on topic), but Sequence.onEach is an intermediate operator that also sends each element downstream, which is not what we want to achieve here.

someChannel.consumeEachAsync(UI, parent = job) { … }? I'm not 100% fan because async returns a Deferred, not a Job, but I find it natural to read otherwise.

I see analogies to PR #274, specifically for map, combine and launchObserver.

Is this issue related to that PR?

All concepts are indeed related.

Hi @elizarov
I consider two discussion strictly related, your problem is to create a "asynchronously updatable value" and this looks covered to PR #274, if that PR does not solve your problem maybe we need to find a valid use case for #274.

The exposed problem is to create an observer for a value (a particular kind of actor), @jcornaz and I discussed here: https://github.com/Kotlin/kotlinx.coroutines/pull/274#issuecomment-371487985 https://github.com/Kotlin/kotlinx.coroutines/pull/274#issuecomment-371497027

It is possible solve this issue creating an actor using a "channel" as parameter instead of "capacity".

How about drainWith to indicate a terminal operation?

fun <T> ReceiveChannel<T>.drainWith(block: suspend (T) -> Unit, ...) =
  launch(...) {
    consumeEach {
      block(it)
    }
  }

@fvasco You are right. In the #274 you propose launchObserver operator. It is, indeed, more convenient that the solution we have now.

To recap. Now we have to write:

launch(UI) { // sets context for the last consumeEach -- very unintuitive!
    source
        .filter { ... }
        .consumeEach { ... }
}

Your proposal:

    .filter { ... }
    .launchObserver(UI) { 
        consumeEach { ... }
    }

It is definitely better. But now we see that launchObserver and consumeEach are going to be often used together. We can do even better and combine then into launchConsumeEach:

    .filter { ... }
    .launchConsumeEach(UI) { ... }

@elizarov

Some more ideas:

E.g.

channel.consumeEachWithCoroutine(CommonPool) {  }
channel.consumeEachIndexedWithCoroutine(CommonPool) {  }

channel.consumeEachWithContext(CommonPool) {  }
channel.consumeEachIndexedWithContext(CommonPool) {  }

channel.consumeEachByCoroutine(CommonPool) {  }
channel.consumeEachIndexedByCoroutine(CommonPool) {  }

channel.consumeEachWith(CommonPool) {  }
channel.consumeEachIndexedWith(CommonPool) {  }

channel.consumeEachBy(CommonPool) {  }
channel.consumeEachIndexedBy(CommonPool) {  }

Here is another idea, how about introducing CoroutineChannelConsumer class (maybe with better name)? 🤔

This class will spawn coroutine for consumeEach and consumeEachIndexed.

val consumer = CoroutineChannelConsumer(channel, CommonPool)

// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEach { ... }  // non-suspend fun

// does launch(CommPool) channel.consume { } under the hood
consumer.consumeEachIndexed { ... }   // non-suspend fun

Pros

  • function names can stay exactly the same

Cons

  • need to wrap channel with CoroutineChannelConsumer object

What about channel.doOnEach { … }?
It is similar to Android KTX naming for extensions like doOnLayout { … }.
However, it does not includes the word consume. I don't know if documenting the fact that it consumes the channel would be enough.

Here's what I'm using at the moment:

import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.coroutineContext

fun <E> ReceiveChannel<E>.doOnEach(
        context: CoroutineContext = DefaultDispatcher,
        parent: Job? = null,
        action: suspend (E) -> Unit
) = launch(context, parent = parent) { consumeEach { action(it) } }

suspend fun <E> ReceiveChannel<E>.doOnEach(action: suspend (E) -> Unit) = launch(coroutineContext) {
    consumeEach { action(it) }
}

I just had two new name ideas: channel.invokeOnEach { .. } and channel.invokeForEach { .. }.

Upside: Same naming as invokeOnCompletion.
Downside: Not so clear that it consumes the channel.

Here's my new version with structured concurrency support (non suspend version has been removed):

import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.coroutineContext

suspend inline fun <E> ReceiveChannel<E>.doOnEach(
        context: CoroutineContext = EmptyCoroutineContext,
        noinline action: suspend (E) -> Unit
) = CoroutineScope(coroutineContext).launch(context) { consumeEach { action(it) } }

Gist link: https://gist.github.com/LouisCAD/ec559f6d62f796e9287145c4797400f7/45eedb3c2c612f2ed1e1be18a9199ff6fbd14b70

We've been designing the similar operator for #254 cold stream and the idea we've decided to go with will be usable for channels, too If you have a channel it would look like this:

channel.asFlow().launchIn(scope) { 
    onEach { it -> action(it) }
    catch<SomeException> { it -> handleException(it) }
    finally { releaseSomeResources() }
}

The other way to use this API would be like this:

scope.launchFlow(channel.asFlow()) { 
    // same DSL here
}

channel.asFlow().launchIn is a way to go, fixed in 1.3.0-M1

A correction: The released replacement for the proposed channel.launchConsumeEach(scope) { block } is channel.asFlow().onEach { block }.launchIn(scope)

Was this page helpful?
0 / 5 - 0 ratings