The share() operator operates on Flow<T> and returns Flow<T>. It shall have the following semantics. The resulting flow is cold, but when one collector shart collecting from it, the it starts to collect from the upstream flow, activating the emitter upstream. The trick of the share operator is that when additional collectors appear in the downstream, they all "share" the same upstream emitter.
For example, consider the flow:
val flow = flow {
var i = 0
while(true) {
delay(1000)
println("Emit $i")
emit(i++)
}
}
If you launch two collectors:
launch { flow.collect { println("A: got $it") } }
launch { flow.collect { println("B: got $it") } }
Then you shall see "Emit 0 / A: got 0 / Emit 0 / B: got 0 / Emit 1 / A: got 1 / Emit 1 / B: got 1 / ...".
However, if you change the flow to val flow = flow { /* same */ }.share(), then you shall see "Emit 0 / A: got 0 / B: got 0 / Emit 1 / A: got 1 / B: got 1 / ...", that is one emission gets delivered to both collectors.
Now if you need to artificially "start" the shared flow simply to keep it active, then you can always launch a dummy collector: launch { flow.collect {} } that works as a "reference" which is active until you cancel the resulting job.
TBD: Share operator might need some configuration with the respect to how much "history" to keep in memory for "late collectors". So far it seems that one non-negative integer is enough (with zero -- new collector don't get any history, with one -- only the most recent value, with more -- the specified number of recent values). What is unclear is what shall be the default value (if any).
UPDATE: It will have to be, actually, a shareIn(scope) operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.
This looks good as #1086 implementation. One thing missing is the sharing timeout, which use case is described in https://github.com/Kotlin/kotlinx.coroutines/issues/1086#issuecomment-485675107
One question about 'starting' a shared/published flow through launch { flow.collect {} }.
In #1086 , starting a shared Flow could only happen on a Flow that was already shared, because connect() could only be invoked on a ConnectableFlow. Any subsequent subscribers/cosumers/collectors to this shared flow would not restart the producer of the flow.
Say I want to create an extension function out of launch { flow.collect {} } called connect:
fun <T> Flow<T>.connect(scope: CoroutineScope) = scope.launch { collect {} }
I can't be sure that the receiver of the connect function is a shared/published Flow.
The argument for exposing a shared/published flow as a _separate_ type, as a ConnectableFlow, is that I can then write this extension function as follows:
fun <T> ConnectableFlow<T>.connect(scope: CoroutineScope) = scope.launch { collect {} }
and non-shared/non-published Flows won't be able to get 'connect'ed.
What happens to the cached "history" when moving from one to zero collectors (cancelling the upstream collection), then back to one? Does the history get preserved across upstream collections or lost?
@zach-klippenstein That is why Rx does not use share() for a shared cache, but cache(). The number of subscribers governs when the flow/stream starts, but it doesn't govern when the flow/stream stops.
With Rx' ConnectableObservable's refCount(), when there is at least 1 subscriber, the Flowable starts. When there are no more subscribers the Flowable stops.
With Rx' ConnectableObservable's autoConnect(), when there is at least 1 subscriber, the Flowable starts but even when there are no more subscribers, the Flowable continues. the 'history' won't get lost.
What is unclear is what shall be the default value (if any).
To me, the name "share" implies only multicasting, so zero would be a reasonable default. The caching/replaying behavior is an additional feature that is often used along with multicasting, but not necessarily implied by the name.
@streetsofboston That's how Rx works, yes, but I was asking for clarification about this operator because I didn't see it specified in the issue description.
I believe share(0) as is proposed here would be equivalent to RxJava's share() (i.e. publish().refCount()), and share(n) would be equivalent to RxJava's replay(n).refCount().
The behavior of saving the cache across "connections" would be similar to Jake Wharton's RxReplayingShare.
What happens to the cached "history" when moving from one to zero collectors (cancelling the upstream collection), then back to one? Does the history get preserved across upstream collections or lost?
I would argue that this should be an option, since it could be useful to have it preserve data over cancelling (for example to use this flow as a memory cache if data takes a while to load after subscribing) or it would not be useful (for example when data from the flow changes frequently, rendering old values obsolete), depending on the context
What is unclear is what shall be the default value (if any).
I think that there shouldn't be any default value, but startWith operator could be added for this like RX.
@zach-klippenstein That's how I modeled my prototype implementation of the ConnectableFlow as well, for the share() and cache() functions: https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
Roman wants to not expose yet another type (ConnectableFlow) from the library and only add one more function called share() for the most-used use-cases so that the Flow api doesn't become bloated. I agree with this sentiment. Keep it as small as possible.
And maybe the argument in favor of exposing a new ConnectableFlow type is strong enough (see my previous comment about a connect() extension function) so that it will be added to the Flow api ....
In the end it's up to the maintainers of kotlinx.coroutines to weigh these factors and make a decision :-)
However, I don't think overloading the proposed share() operator with caching functionality (share when _size == 0_, cache when _size > 0_) is a good idea. Better to spell out the functions' responsibilities and have a share() function without a _size_ parameter that shares and a cache() function with _size_ parameter that caches.
Maybe a separate, but related, issue should be opened for a Flow.cache operator.
@matejdro With the ConnectableFlow from my gist, either would be possible:
Cache survives re-activations of the flow:
flow.replay(size).autoConnect()
Cache does not survive re-activations of the flow:
flow.replay(size).refCount()
This looks good as #1086 implementation. One thing missing is the sharing timeout, which use case is described in #1086 (comment)
Thanks for timeout reminder. I'll see how it can be incorporate here is some nicer way than just adding an additional "timeout" parameter to this operator.
Say I want to create an extension function out of
launch { flow.collect {} }called connect:
We actually plan this kind of operator, tentatively to be called launchIn(scope) for all kinds of flows. The only missing piece in this operator's design is user-friendly error-handling.
However, I don't think overloading the proposed
share()operator with caching functionality (share when _size == 0_, cache when _size > 0_) is a good idea. Better to spell out the functions responsibilities and have ashare()function without a _size_ parameter that shares and acache()function with _size_ parameter that caches.
I don't see how to decouple share and cache functionality. It seems that you cannot cache without sharing, so we can just as well have a single operator with optional integer parameter that defaults to 0. No need to overload, because share() is conceptually same as cache(0).
@elizarov
you can always launch a dummy collector: launch { flow.collect {} }
But the scope remains active until job is working
@fvasco
That's true. It can be cancelled, though.
And it will resume when, in case of share(), all subscribers/collectors stop collecting.
I don't see how to decouple share and cache functionality
My prototype implementation decouples the two. It requires two private implementations ofConnectableFlow, though...
My prototype implementation decouples the two. It requires two private implementations of
ConnectableFlow, though...
I actually envision a single implementation where "replay size" (of zero or more) is just a configuration parameter.
That proposal lookg good. It leaves me wondering about Throwables handling though:
If you need to handle errors for all collectors in a shared flow (that is, handle emitter's errors), then you have to put error-handling operators _before_ you call share(). Btw, take a look at the error handling proposal: https://github.com/Kotlin/kotlinx.coroutines/issues/1263
My consideration is that the launch trick has non-trivial consequences.
This can be related to #1065
I actually envision a single implementation where "replay size" (of zero or more) is just a configuration parameter.
You can design it to just have one "share" method with a replay-size parameter, but my point was that decoupling the share and cache use cases is possible. I'd argue for decoupling, but you can argue for one "share" method as well.
Cache survives re-activations of the flow:
flow.replay(size).autoConnect()
@streetsofboston This is slightly different. AutoConnect preserves the upstream connection as well. RxReplayingShare will still disconnect upstream when the ref count hits zero (like the refCount operator), but it will preserve the cache and continue to replay the cache to new subscribers. The latter is useful if the upstream subscription represents some expensive resource that should be shutdown if nobody is actively listening, but you still want to have some initial value to emit immediately if the upstream takes a while to "warm up", like @matejdro suggested.
In general I am wary of autoConnect in Rx because it's so easy to misuse and accidentally leak the upsteam connection. Structured concurrency makes that a lot safer, since the connection requires the collection to happen in a particular scope.
I think that there shouldn't be any default value, but startWith operator could be added for this like RX.
@matejdro The function of startWith, at least as Rx defines it, is unrelated to multicasting and caching, but I see what you meant. I read that open question as referring to the "default value" of the cache size parameter - zero, one, or something else - not the default value of the cache itself, which, as you said, would just be empty until the upstream emits something.
In @streetsofboston's proposal, share and cache are "sibling" operators with disjoint functionality, but both are composed of the same two steps: of multicasting and automatic connection. I also prefer that naming because caching implies sharing, but sharing does not imply caching.
You could also make share and cache coordinate through operator fusion (like the recent change to buffer), but it's unclear what cache would do if not adjacent to share, since it doesn't make sense to cache for a single collector:
flow.share() // returns a multicasted flow
.cache(1) // configures the multicasted flow to cache 1 emission
I don't mind using a single operator for both, as long as the operator name communicates that it does both multicasting and caching (e.g. shareReplaying).
Having a cache() operator for Flow would be misleading as you might use it incorrectly on any Flow. The fact that a flow is shared and has cache is an implementation detail, and we should avoid operators that make assumptions on the source Flow. Since cache only makes sense when the Flow is shared, and needs to be applied only on the sharing flow, not by the consumers, it should be a configuration, or an overload of the share operator.
How will the updated signature shareIn(scope) work with several dynamically connecting and disconnecting collectors? E.i. there are use cases for the share operator that require the flow to keep track of the scopes of its consumers. Here is a quote from a Slack message with such a use case:
What I'm looking is to create long time operation that persists through multiple scopes. For example:
- Screen A (with Scope A) starts long operation
- User presses a button to switch to Screen B
- Screen B (with Scope B) also starts this long operation, but since this operation is already running, it would just somehow "add its scope"
- Screen A is closed, so Scope A that started the operation is cancelled. But since Scope B is still using this operation, it would not stop until Scope B is also cancelled.
shareIn(GlobalScope) would do the trick.
shareIn(GlobalScope)would do the trick.
Would it close the flow if all of the consumers are cancelled? In terms of the above example that would mean canceling the long-running operation in the flow if both consumers, Screen A and Screen B, are closed.
@voddan Yes, that's the whole point of having a share operator for Flow instead of a ConflatedBroadcastChannel that you have to close manually.
Here is my use case.
I'd like to represent a stream of clicks as a Flow<Unit>. It should support multiple subscribers, while registering a listener on subscription (at least one subscriber), and unregistering on unsubscription (no more subscriptions). I believe in RxJava, this would be represented by publish().refCount() - but that naming is pretty terrible, and a lot could be improved here.
My UI might have the following interface:
interface MyUi {
fun clicks(): Flow<Unit>
}
On Android, this means using setOnClickListener { .. }. This means that when the clicks(): Flow<Unit> is subscribed to, I want setOnClickListener { .. } to be registered. When it is unsubscribed from, I want setOnClickListener(null) to be called, so the listener is unregistered.
This scenario works great for one subscriber, but falls short when you want multiple subscribers. Take the following scenario (note: I know .onEach { } could be leveraged, but that reduces flexibility)
fun onCreate() {
ui.clicks()
.onEach { viewModel.send(ClickEvent) }
.launchIn(onCreateScope)
ui.clicks()
.onEach { log.v("Ui clicked!") }
.launchIn(onCreateScope)
}
viewModel.send(ClickEvent) now stops receiving events because of the second subscription. I would like to be able to share an instance of Flow<T> to two subscribers, so setOnClickListener { .. } is registered once for both (when subscriber count goes from 0 to 1), and then unregister the listener via setOnClickListener(null) when the subscriber count goes from 1 to 0
Hi folks, happy sunday. I wanted to check if any progress or timetable on this operator. Thank you kindly
food for thought - customizing share behavior over the default could be controlled via an optional lambda. example:
stateFlow.shareIn(scope) {
autoConnect() // could be renamed `autoCollect`, `collectImmediately()`, `collectImmediate()`
}
downside would be the extra class definition.
one common "gotcha" is that when sharing:
stateFlow.shareIn(scope)
the first subscriber will get the replayed State value from the StateFlow<State>, but new subscribers won't. Could there be a strategy to potentially mitigate this? (I can't think of one, other than a lint warning)
@ZakTaccardi I was thinking about a separate operator for state flows that avoids this problems and returns a StateFlow. This way you can share chains of state computations. Moreover, this operator would be defined on a Flow so that you can first do state.map { ... } (which gets you a Flow) then "share the resulting state". It means, though, that we cannot reuse share name and have to invent another name for it which is a challenge.
We needed a similar operator in Store and implemented a custom version.
Just linking here for reference.
https://paper.dropbox.com/doc/Store-Flow-Multiplexing--ApUW0sTj9Oyl7iRxyNBaBOK5AQ-GcFETeZI9pHEk2zmnHfW4
https://github.com/friendlyrobotnyc/Store/tree/master/store/src/main/java/com/nytimes/android/external/store4/impl/multicast
The implementation works by launching an actor when first downstream observer shows up.
Each downstream receives an unlimited channel where the Actor sends values that it received from upstream.
We wanted to avoid abusing the upstream when downstreams are slow to collect. To achieve that, each message is acked by the downstream when collected and upstream waits for an ack before collecting anther value.
In terms of buffering, by default, we do not buffer. If a downstream arrives late where it does not end up getting any new values, we simply create a new producer. There is also an option to buffer.
We've also added piggybacking where downstream can be kept open even after upstream closes to receive new values if we end up creating another producer for another downstream (this is where it gets really custom to our use case).
Implementation is probably not great and more expensive it needs to be but works for our use case.
Could someone please clarify what exactly would be the differences between this future shareIn() operator and the currently available .broadcastIn().asFlow() ? Thanks in advance.
The idea is that shareIn() will automatically start collection from the upstream when needed. The difference is that broadcastIn().asFlow() starts collecting immediately and so becomes "hot" stream that is always collecting upstream regardless of the presence of downstream collectors.
Just wanted to say, that share (refCount, replay and friends) is last blocker for me to switch to Flow
@ursusursus
Just wanted to say, that share (refCount, replay and friends) is last blocker for me to switch to Flow
My gist of #1086 may help you:
https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
Migrating from RxJava and I just realize that this operator is not yet implemented in Flow. 😢
@GianfrancoMS I was trying to move from WebFlux to Flow and also running into the issue. Is there an easy workaround, to at least have something the works?
@GianfrancoMS I was trying to move from WebFlux to Flow and also running into the issue. Is there an easy workaround, to at least have something the works?
See my earlier comment; it may be of help for you:
https://github.com/Kotlin/kotlinx.coroutines/issues/1261#issuecomment-560980521
@GianfrancoMS
Migrating from RxJava and I just realize that this operator is not yet implemented in Flow. 😢
If you do not mind making it “hot”, you can use broadcastIn.
It's working now. My use case is pub/sub where I get events in from Kafka, and there could be multiple clients that should get the same event.
https://github.com/openweb-nl/kafka-graphql-examples/tree/ge-kotlin/graphql-endpoint
I'd like to revisit the fusing/conflation of cache and shareIn.
It seems to me that the most common (Android) use-case for cache is the one of an exposed LiveData or ConflatedBroadcastChannel-backed Flow in a ViewModel. When a configuration change happens, you lose that data and have to start fresh... But how many times do we actually observe that LiveData from more than one observer? I'd say almost never.
If we were to get that multicasting ability for free, that'd be one thing, but it clearly isn't free.
Here's my own gist for a SharedFlow. gist
I think it makes much more sense to chain the operators: myFlow.cache(1).shareIn(scope). With my implementation, I don't see a real need to do a fusion like with buffer and context.
Here's a gist for a cold cache(size: Int) Flow using a CircularArray: gist
I don't love cache(size: Int) as the operator name. I feel like the English meaning is far too similar to buffer and can lead to lots of confusion and mis-use. replay(size: Int) feels much more distinctive to me.
If no one has strong objections, I can get them PR-ready in a few days.
@digitalbuddha did the trick here https://github.com/dropbox/Store/blob/master/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt
Here is my take on the share operator: https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252
It supports conflating and timeouts described in https://github.com/Kotlin/kotlinx.coroutines/issues/1086#issuecomment-485675107. From what I see, implementation is a bit simpler than above links (it just uses an actor that loops through events), but it is not very fast - I would not use it to emit thousands events per second. However, in my projects I almost never need that big of a throughput, so it fits my use cases.
hi, @matejdro I have seen your gist, and it looks good.
What do you think of using an extension function in this loc? https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252#file-multicastflowtest-kt-L139
Yes, that would probably be a good idea.
I've also came across to use this for a usecase. Right know to do it is imperative premature collecting the flow.
I just noticed this one:
It will have to be, actually, a shareIn(scope) operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.
I think this unnecessarily complicates the use of this feature. Flow is already scoped by the downstream (it starts collecting when first collector starts and stops when all collectors get cancelled). Introducing another scope would significantly complicate implementations where such share operator is exposed by the stateless objects.
It definitely makes the stream more complicated to reason about, but I don't think it does so unnecessarily. Multicasting inherently involves a scope that is separate from that of any of the downstream collectors, because no downstream collector is guaranteed to be active for the lifetime of the upstream collection. That scope could be implicit, as in the case with a reference counting approach, but sometimes you need it to be explicit: if the owner of the multicasting operation has its own scope (e.g. is a service-type object that gets shutdown when a user logs out of an app), it may want to explicitly manage the lifetime of that upstream collection and cancel it when the service object itself is shutdown.
I see the purpose here. So we would need support for both explicit and implicit scopes?
@zach-klippenstein Issue https://github.com/Kotlin/kotlinx.coroutines/issues/1086 discusses such a class for explicitly managing the scope: ConnectableFlow.
Instead of the fun connect(CoroutineScope): Connection shown there, though, it just could have fun launchIn(CoroutineScope): Job, like a regular Flow, to manage its lifecycle. For the share() operator, it is then a question of managing that 'shared' CoroutineScope based on reference-counting the number of subscribers.
Scoping by downstream is not enough. We need the _real scope_ to provide the context for pulling upstream data. We cannot just take the context of the first downstream subscriber. It can get destroyed, while other subscribers should not be suffering. In a case where you don't really care, you can always use shareIn(GlobalScope).
📣 Asking here a question to the community interested in this issue. There are tons of use-cases listed here and in #1086, but they all revolve around working with never-ending streams of incoming events or state updates. However, a Flow (just like other reactive streams) does not have to be infinite. It can _complete_. It can produce a number of events and stop. Disregard the question of failures for a while. Let's discuss just a regular completion. Assume that we don't retry either and are not just polling a non-streaming data source. So we have upstreamFlow.shareIn(scope) and the upstream flow completes.
Now, in Rx there is a replay operator that records all the upstream emissions and replays them to each downstream, completing the replay when the upstream had completed. I understand how replay works, but why would anyone ever need it? What are the actual use-cases for replay operator in real-life application? More generally, what are use-cases for having a stream that _completes_ (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?
What are the actual use-cases for
replayoperator in real-life application? More generally, what are use-cases for having a stream that _completes_ (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?
For me, having a share/replay operator would allow to implement some sort of if-/else logic with flows, i.e. to distribute elements into multiple flows based on criteria.
For example, I need the first 100 item with foo == "Bar' in flow a and all other items in flow b.
With share/replay:
var flow = //...
flow = flow.shareReplay() //or just share()
val a = flow.filter { it.foo == "Bar" }.take(100)
val b = flow.filter { it.foo != "Bar" }
In this case, I don't want to execute the input Flow multiple times (which could be a file consisting of a couple of GB of data on a network drive etc.).
So shareReplay should be able to transparently handle the "take" (i.e. don't block the other flow) and avoid evaluating the input flow multiple times.
For reference: This is the share implementation I ended up using which seems to fulfill my requirements (in my case parsing a binary file with a dynamic schema and converting to CSV - for this, I need to "split" the flow to "guess" the column headers based on the first X entries, but then need all entries again to actually to the parsing and writing):
(I wasn't able to get rid of the consumerCount parameter, because in my case I collect the first flow before starting the second one)
private val logger = KotlinLogging.logger {}
fun <T> Flow<T>.share(consumerCount: Int, bufferSize: Int = BUFFERED, scope: CoroutineContext? = null): Flow<T> {
if (this is SharedFlow)
return this
logger.debug { "Sharing flow" }
var broadcastChannel: BroadcastChannel<T>? = null
var channels: List<ReceiveChannel<T>>? = null
val lock = ReentrantLock()
val pendingConsumers = AtomicInteger(consumerCount)
val index = AtomicInteger(0)
return SharedFlow(flow {
lock.withLock {
if (broadcastChannel == null) {
logger.debug { "First consumer found. Creating channel and flows" }
broadcastChannel = buffer(bufferSize).broadcastIn(CoroutineScope(scope ?: coroutineContext))
channels = 0.until(consumerCount).map { broadcastChannel!!.openSubscription() }.toList()
}
}
val curIndex = index.getAndIncrement()
val receiveChannel = channels!![curIndex]
val flow = receiveChannel.consumeAsFlow()
logger.debug { "Flow $curIndex is now being consumed" }
try {
logger.debug { "Starting emitting (pendingConsumers is ${pendingConsumers.get()})" }
emitAll(flow)
} finally {
receiveChannel.cancel()
if (pendingConsumers.decrementAndGet() == 0)
broadcastChannel!!.cancel()
logger.debug { "Finished emitting (pendingConsumers is ${pendingConsumers.get()})" }
}
})
}
private class SharedFlow<T>(fl: Flow<T>) : Flow<T> by fl
fun <T, R> Flow<T>.flatMapItems(callback: (T) -> Iterable<R>): Flow<R> = flatMapConcat { callback(it).asFlow() }
inline fun <T> Flow<T>.parallelFilter(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline predicate: suspend (T) -> Boolean): Flow<T> {
return flatMapMerge(concurrencyLevel) { value ->
flow { if (predicate(value)) emit(value) }
}
}
inline fun <T, R> Flow<T>.parallelMap(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (T) -> R): Flow<R> {
return flatMapMerge(concurrencyLevel) { value ->
flow { emit(transform(value)) }
}
}
I work on a fairly large mobile app, and we use replay(1) (ie stateIn) all the time but I don't think I've ever seen an unbounded replay in our codebase.
@matthiaswelz Thanks for your use-case. Note, that in your use-case you are not actually "widely sharing" the flow. You don't actually publish the shared flow for later consumption by an unknown number of future collectors. On the contrary, here you know, in advance, that there are going to be two downstream flows that are going to process your upstream flow. In this case, it looks like Rx-like replay is a bit of overkill, since it actually caches all the data _forever_ so you'll be running out of memory if the data stream is very long, you are not getting advantage of the streaming nature of data producer here to minimize memory consumption.
I think that your use-case of "splitting the flow in a few other flows" should be covered by a separate operator which we can tentatively call replicate. See this comment with a strawman example: https://github.com/Kotlin/kotlinx.coroutines/issues/1086#issuecomment-585195388
I work on a fairly large mobile app, and we use
replay(1)(iestateIn) all the time but I don't think I've ever seen an unboundedreplayin our codebase.
@zach-klippenstein But do you ever call replay(1) on an upstream flow that is finite and completes at some moment of time?
There is a design for SharedFlow that provides most of the underlying framework to implement the actual sharing operators. See #2034
This issue is superseded by the worked-out design of sharing operators. See #2047
While it's not release, a good enough worakround is that simple extension:
fun <T> Flow<T>.cache(scope: CoroutineScope, default: T): Flow<T> {
val stateFlow = MutableStateFlow(default)
scope.launch {
collect {
stateFlow.value = it
}
}
return stateFlow
}
Most helpful comment
Migrating from RxJava and I just realize that this operator is not yet implemented in Flow. 😢