For ConflatedChannel the problem was discussed in #332, addressed in #1235 and fixed by #1239,
but not for ConflatedBroadcastChannel
Slightly modified example from #1235:
fun main() = runBlocking {
val channel = Channel<Int>(capacity = Channel.CONFLATED)
with(channel) {
send(1); send(2); send(3); send(4)
close()
}
println(channel.receive())
}
Correctly prints 4, but
fun main() = runBlocking {
val channel = ConflatedBroadcastChannel<Int>()
with(channel) {
send(1); send(2); send(3); send(4)
close()
}
println(channel.openSubscription().receive())
}
throws ClosedReceiveChannelException
Is it by design?
It is the design now, but we can change it. What is your use-case for ConflatedBroadcastChannel where this difference in behavior becomes important?
I just ran into the same issue.
I have a ConflatedBroadcastChannel for emitting DownloadStatus (initial, running…, completed).
class DownloadCoordinator {
private val stateChannel = ConflatedBroadcastChannel<DownloadState>(DownloadState.Initial)
val states = stateChannel.asFlow()
// etc.
}
Emitting looks like this:
Now a consumer that's interested in the latest state of the download simply does this:
downloadCoordinator.states.first()
It was totally unexpected to me that .first() doesn't work on a ConflatedBroadcastChannel-backed Flow once the channel is closed:
NoSuchElementException: Expected at least one element
I've expected the flow to re-emit the last value.
I also have other consumers that use states.collect { … } to follow the progression of the download. That should complete automatically once the terminal state (completed) was reached.
From the docs:
ConflatedBroadcastChannel
Broadcasts the most recently sent element (aka value) to all openSubscription subscribers.Back-to-send sent elements are conflated – only the the most recently sent value is received, while previously sent elements are lost. Every subscriber immediately receives the most recently sent element. Sender to this broadcast channel never suspends and offer always returns
true.
It mentions three times that the "most recently sent element" is received by subscribers.
"most recently sent element/value" is mentioned three times.
The only mention of different behavior after closing is in the description of valueOrNull:
The most recently sent element to this channel or
nullwhen this class is constructed without initial value and no value was sent yet or if it was closed.
Thanks for the detailed explanation!
It seems like this particular problem will be resolved with stateful flow
It is being provided by implementation in #1974
@elizarov Sorry for late reply in these difficult times
My use case is like in #332, that is why I did not specify it. More specifically second use case in your comment.
I have a computation producing intermediate results, which I want to expose as Flow from API. So I have private ConflatedBroadcastChannel and expose it via channel.asFlow().
My original intention was to close channel normally when computation is complete to indicate that the last result is a final result of computation. And close with error if computation fails.
But if the client collects the flow after channel is closed, it gets no results at all.
Currently my solution is to never close channels, which feels wrong. I also had to introduce additional wrapper class with isFinal flag, so that I can use takeUntil { it.isFinal } operator on the flow before returning it. Btw, currently there is no such operator in the library.
Anyway, my point was that ConflatedChannel and ConflatedBroadcastChannel should behave similarly. I don't see why the number of clients should change the semantics wrt last value. Deliver last value or not, but shouldn't it be consistent? What is the fundamental difference in semantics between the two which I am missing? I though that it was just overlooked and forgotten, so I opened this issue to remind.
Now, you can close the issue if you will, but in my opinion it is not resolved. Neither StateFlow nor SharedFlow could be used in case of computation with intermediate values because they are both designed for never-ending streams. They can not be closed and there are no mechanisms for error propagation.
@pacher As discussed in SharedFlow design #2034 you can materialize completion. There is no takeUntil operator, but there is a takeWhile operator, so you can write takeWhile { !it.isFinal } on the collector's side of the SharedFlow. Moreover, shared flow lets you replay any number of items to the collectors, so you can configure shared flow with replayCapacity = 2 to get both the final result and the completion status of the computation. Thus, it seems to me that the shared flow design in #2034 fully covers your use-case.
so you can write
takeWhile { !it.isFinal }
@elizarov No I can't, because then final result will not be included, which is a bummer.
The difference to takeWhile is that predicate is checked after emitting the item so that the item which flipped predicate is included as well. RxJava and Reactor both have takeWhile/takeUntil combo. Weirdly I did not find it in kotlin collections, I was sure it is there too.
For implementation I copy-pasted takeWhile and changed
if (predicate(value)) emit(value)
else throw AbortFlowException(this)
to
emit(value)
if (predicate(value)) throw AbortFlowException(this)
But it would be really nice to have it in the library (even simple takeWhile uses internal classes which I had to copy as well). Maybe somebody can make a PR, maybe even me.
As discussed ... shared flow design in #2034 fully covers your use-case
Yes, I got your point in #2034 that materializing is the way to go, which I still think is a workaround (and not only me). I would rather say: "SharedFlow could be used in building a solution which covers your use-case".
While we are waiting for proper operators in the library, can you please have a look at this naive implementation of dematerialize?
sealed class Event<T> {
class Emission<T>(val item: T) : Event<T>()
class Error(val error: Throwable) : Event<Nothing>()
object Complete : Event<Nothing>()
}
fun <T> Flow<Event<T>>.dematerialize(): Flow<T> =
takeWhile { it !is Complete }
.map {
when(it) {
is Emission -> it.item
is Error -> throw it.error
Complete -> error("Can not happen")
}
}
Is it usable? Is it idiomatic just to throw from map? Will it play nicely with cancelling subscription upstream, cleaning up and whatnot (my source is currently still ConflatedBroadcastChannel.asFlow() which uses openSubscription)? I guess the answers are yes, but I am still too new to coroutines to feel confident
Thanks
P.S. Again, the issue was not about my use case, but about consistency between ConflatedChannel and ConflatedBroadcastChannel. One delivers last value and the other does not. That's just weird and potentially confusing, that's all. It's up to you to leave it like it is.
so you can write
takeWhile { !it.isFinal }
No I can't, because then final result will not be included, which is a bummer.
@pacher Good point. Thanks. See #2042
While we are waiting for proper operators in the library, can you please have a look at this naive implementation of dematerialize?
Is it usable? Is it idiomatic just to throw frommap? Will it play nicely with cancelling subscription upstream, cleaning up and whatnot (my source is currently stillConflatedBroadcastChannel.asFlow()which usesopenSubscription)? I guess the answers areyes, but I am still too new to coroutines to feel confident
This implementation looks good to me and it is quite idiomatic. It is similar to what we'll write if we make a decision to include that kind of operator into the library.
We've decided to add Flow.transformWhile operator for now to cover the use-case of delivering last value and then completing. See #2065 for details.