Kotlinx.coroutines: Convert `ReceiveChannel<T>` to `Flow<T>` without consuming

Created on 30 Aug 2019  路  9Comments  路  Source: Kotlin/kotlinx.coroutines

My ViewModel exposes a stream of events as a ReceiveChannel<T>

// in `ViewModel`
private val events = Channel(capacity = Channel.UNLIMITED) // these are "side effects"

fun events(): ReceiveChannel<T> = events

My activity/fragment observes this stream of events, without consuming on cancellation.

// in an Activity's onCreate

launch {
  for (event in viewModel.events()) {
    ui.handleEvent(event)
  }
}

By not consuming on cancellation (unsubscription), this allows my UI to re-use the same ReceiveChannel<T> instance across configuration changes.

// this exists already
fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>
// this does not
fun <T> ReceiveChannel<T>.asFlowWithoutConsuming: Flow<T>

I think the latter function would be useful. Can it be added?

enhancement flow

All 9 comments

Why would you need to have it exposed as Flow type? What is wrong with the code that you currently have that explicitly declares your events with ReceiveChannel<T> type? It looks easier to understand for me this way (the way you have it now).

@elizarov because a ReceiveChannel<T> is mutable - it can be closed by the caller.

// this is fine
launch {
  for (event in viewModel.events()) {
    ui.handleEvent(event)
  }
}
// this is a bug, because on rotation when you subscribe again to the same channel instance
// we get a crash because the channel is already closed
launch {
  viewModel.events()
    .consumeEach { event -> ui.handleEvent(event) }
}

Thanks. That's clear. How about receiveAsFlow name for this conversion?

perfect name!

My original thinking is that receive indicates a single value, but then I realized there is consume for a single value and for multiples there is consumeEach. Now I
am just realizing I originally felt this way because there is no ReceiveChannel.receiveEach. Should there be a .receiveEach { } too? (maybe could make this a separate issue)

Hey @ZakTaccardi , @elizarov. I have had a look at this issue , no one has implemented it yet and would love to take it up and raise a pr on this. To implement this i was thinking of such an approach .

public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ReceiveAsFlow(this)
Then a class similar to the consumeAsFlow that doesn't use the markConsumed().
```private class ReceiveAsFlow(
private val channel: ReceiveChannel,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlow(context, capacity) {

override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
        ConsumeAsFlow(channel, context, capacity)

override suspend fun collectTo(scope: ProducerScope<T>) =
        SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll

override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
    return if (capacity == Channel.OPTIONAL_CHANNEL) {
        channel // direct
    } else
        super.produceImpl(scope) // extra buffering channel
}

override suspend fun collect(collector: FlowCollector<T>) {
    if (capacity == Channel.OPTIONAL_CHANNEL) {
        collector.emitAll(channel) // direct
    } else {
        super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
    }
}

override fun additionalToStringProps(): String = "channel=$channel, "

}
```
Any thoughts and feedback on this will be appreciated on whether the approach is feasible or not. Thanks.

There should not be any cut-and-paste in this method implementation, as we are concerned about the overall size of the library. I've sketched the right-looking implementation in this commit: https://github.com/Kotlin/kotlinx.coroutines/commit/b16beb00b88a16a992e0d9f84184ae68a39d8226 It needs tests, though. There are also open design issue with respect to the consistency of operator naming and its interaction with emitAll. We'll get back to it when time permits.

Thanks for the feedback. I have seen the implementation and definitely learned from it.

Here's a super basic implementation, though may have race conditions where the ReceiveChannel<T> emits but the Flow<T> doesn't due to a race condition around unsubscription?

/**
 * Converts `this` [ReceiveChannel] into a [Flow] without closing the [ReceiveChannel] when the [Flow] is cancelled.
 */
fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = flow {
    val flowCollector = this

    for (item in this@receiveAsFlow) {
        flowCollector.emit(item)
    }
}

@ZakTaccardi I have faced a weird problem as your implementation worked for me, that I was using kotlinx coroutines with vertx kotlin coroutine extension where they expose the pubsub as a channel, and I tried to use it as a receive flow by the extensions to channel.

Unfortunately it didn't work, and so I came up with the equivalent of what you did:

@ExperimentalCoroutinesApi
fun <T> ReceiveChannel<T>.receiveAsFlowByIterationUnsafe(): Flow<T> = flow {
    [email protected] { emit(it) }
}

The original form is like this:

val eventFlow = flow { vertx.eventBus().localConsumer<Event<String, NetSocket>>("fire").toChannel(vertx).consumeEach { emit(it) } }
// do any thing...

After I checked out the consumeEach function I see that it is shockingly the same as you do, and also because receiveAsFlow and Flow.emitAll(chan: ReceiveChannel) are equivalent we cannot use it as an alternative substitute. So I'm also left hanging with the problem you faced

It seems like we need to do a redesign regarding channel and flow. I don't know why, but for some reason there aren't receiveOrClosed from RendezvousChannel?

Anyway, this might be the final form of my extension that preserved the relations receiveAsFlow had while sidestepping the receiveOrClosed abstract method problem:

@ExperimentalCoroutinesApi
fun <T> ReceiveChannel<T>.receiveAsFlowByIteration(): Flow<T> = flow {
    cancel(CancellationException("receive stream terminated",
        [email protected] { while (!isClosedForReceive) emit(receive()) }
            .exceptionOrNull()
    ))
}
Was this page helpful?
0 / 5 - 0 ratings