Unlike point-to-point channels that are somewhat tricky to use, various kinds of BroadcastChannel implementations seem well suited to directly implement Flow interface. That would make easier and slightly more efficient to use them as "data model" classes in MVVM architectures, being able to directly use full set of flow operators to transform them and wire to UI.
Also, we might consider a shorter name for ConflatedBroadcastChannel that would focus on its "dataflow variable" nature. DataFlow could be a good starting point for this name bike-shedding.
What about StateFlow? I like the idea that "state" is repayable
Having thought about it a bit more, it looks the whole BroadcastChannel is a misnomer. They are not really channels! They are more like some kind of "hot flows".
+1 to DataFlow
Will this also mean that internal implementation of DataFlow will be changed to be better for MVVM/reactive use case (yes, I'm talking about #395)
I've said it before and will say it again. ConflatedBroadcastChannel IMHO is a really powerful tool for us developers. This could make it even better.
The nice thing about ConflatedBroadcastChannel is that it handles the subscription to the channel and the subscribers for you in a simple and safe manner - it's like messaging. It's very easy to use.
The only thing I'm missing is an easy way to control the storing of the state. I have a usecase where I want to push "an Event" to all subscribers but I don't want this Event stored.
I do think that a broadcast channel with the capacity of zero is missing too (like publish subject in RX) .
@adibfara What is your use case for such a zero-capacity broadcast channel?
@adibfara What is your use case for such a zero-capacity broadcast channel?
@elizarov What I meant was, currently I found no way to have a broadcast channel (a channel that does not suspend for writing), that does not emit the value that is holding to the anyone that opens a subscription.
The use case could be the need to listen for events (as in change) of something, and we only need the updates.
The pipe that is holding the value inside the ConflatedBroadCastChannel, might have a very old data, and we open a subscription to it, we get the data, even though we might already have a _newer_ data that has not been pushed to the channel (because we don't want to broadcast it), and we only want to be updated if the broadcast channel emits it to us.
If this seems interesting, I can move it to another thread/issue since it might be unrelated to the title of this issue.
@adibfara Let's keep this discussion here for now because it is important to take into account for this issue of aligning broadcast channels with flows. It is important to learn more about your use-case, as it related to, at least, proper naming of these concepts. Let me try to rephase how I'm getting this particular use case for zero-capacity (not conflated):
Say we have some classical callback-based even-subscription API:
obj.addEventListener(listener)
...
obj.removeEventListener(listener)
This API is quite error-prone as it is easy to forget removeEventListener, so instead we can represent it is a flow, which give us the advantage of structured concurrency, so we can just write:
launch { // in proper scope
eventFlow.collect { event -> doSomething() }
}
Moreover we already have an API for that (current called flowViaChannel, see its docs) that lets you adapt existing callback-based API to the flow. But what if we don't have existing callback-based API. We have only a piece of code that emits those events and we don't want to spend time implementing and managing all those "event listeners list". What we want is something that _is_ a Flow, but at the time where we can simply reports events to it and make them delivered to all active "listeners":
val eventFlow = EventFlow()
fun onEvent(event: Event) {
eventFlow.emit(event) // all collectors receive it
}
The problem that we have is back-pressure and context management. The collectors (listeners) maybe slow and busy we something else at this moment. So what should eventFlow.emit do?
Here is how ArrayBroadcastChannel works now. You can create val bc = BroadcastChannel(1) and then the first call to bc.send(event) completes immediately and starts sending it to all the listeners, but the subsequent call to bc.send(event) would suspend if the previous event was not processed yet.
Is this one-element-channel enough? Do you really have a use-case where eventFlow.emit/bc.send must suspend until all the collectors have processed the event? Why you might need this behavior? Under what circumstances capacity=1 may not suit you?
The way I currently deal with "hot flows" exposed as ReceiveChannels (usually from a BroadcastChannel that may be conflated is by using a variant of consumeEach that cancels the coroutine consuming a value when a new value is received.
You can see how I implemented the consumeEachAndCancelPrevious extension here:
I use it a lot in my projects, that is my alternative to repeating callbacks that dispatch a state or events, and it allows me to ensure proper cancellation when the state has changed.
I did not use flow in its current form yet because it doesn't seem to support this kind of usages, and that's what I'm need a lot in the project I'm working on.
@elizarov Thanks for the info. The reason I suggested moving it out of this conversation was that with my understanding, what I'm suggesting is somewhat of a _hot_ and maybe endless stream of values, not a cold one.
The problem that we have is back-pressure and context management. The collectors (listeners) maybe slow and busy we something else at this moment.
How this is handled with the ConflatedBroadCastChannel? I was under the assumption that the sender, is not concerned about back-pressure and the context of the channel's listeners.
Here's what I think the current APIs's, from a cold/hot standpoint, so correct me If I'm wrong:
Channels:
Hot, _send_ and consumers both suspend, so senders are back-pressered because of slow receivers. Each item is consumed only once.
Flow:
Cold, like channels, receivers suspend, waiting for the flow to finish. Stuff cannot be send to a flow from outside of it, because they should be _built_, like channel's produce. Unlike produce, gathering the data does not start immediately, and starts upon subscription. Each item is consumed once.
BroadCastChannel:
Hot, each item gets consumed by every consumer and each sender suspends until everyone has consumed the item.
ConflatedBroadCastChannel:
Hot, only consumers suspend, waiting for the next value, and immediately get the _current_ value upon subscription. Senders can _only_ access the channel from outside of it and send stuff into it and do not suspend. Each item _might_ get consumed by all consumers (if they are not busy), and sending items to the consumers does not need back-pressure handling.
Broadcast (the thing that I was talking about):
Hot, consumers (0 to n) suspend for the next value, and do _not_ get the current value of the stream upon subscription. Everything else is exactly like ConflatedBroadCastChannel. Senders do not care about this item's consumption, so If a new item arrives before previous item is consumed by all consumers, It would get lost unless it is stored in an array or something (with capacity). So the only difference with conflated broadcast channel, is the initial submission upon subscription. Currently, I see this broad cast channel as a general conflated broadcast channel, where it would store the last n items, and emits them to anyone that opens a subscription, and the old conflated broad cast channel is a special case with the capacity of 1. So the capacity of 0 would mean it will not store anything, and a capacity of 5 would mean it would store the latest 5 values. This is like how Rx's subject behave, in which, if you subscribe to a PublishSubject you will only receive items that get emitted after your subscription, and if you subscribe to a BehaviorSubject, upon subscription, you will get the stored value, and the next items after wards. This goes the same for ReplaySubject which takes a capacity parameter, so BehaviorSubject is actually a ReplaySubject(1).
Do you really have a use-case where eventFlow.emit/bc.send must suspend until all the collectors have processed the event?
I do not need the bc.send to wait for every collector to consume it.
To add to the BroadcastChannel(capacity = 0) discussion, in the Guide to Reactive Streams with Coroutines document you state that BroadcastChannel(capacity) corresponds to Rx's PublishSubject, but that's not true.
ArrayBroadcastChannel contains a buffer and emits all items in the buffer to any new subscriber. That sounds more like a ReplaySubject.
Furthermore, the ArrayBroadcastChannel (and all other channels, as far as I can see) block the sender if the buffer is full instead of dropping the oldest items.
In my use case I'm looking to send events to my UI that are only important if there is a subscriber when the event is fired. For instance, if I need to display a toast notification relating to a screen, but the screen is paused or obstructed, there's not reason to show that notification, and it wouldn't make sense to show it later when the screen resumes (and resubscribes) since the context is lost.
Effectively I would expect the BroadcastChannel(capacity = 0) to create a pipe that I can throw data down, without the sender caring who's on the other end. If no one is there, discard the data. If someone is there, give them the data, but don't give them historical data.
A correction here, despite the fact that ArrayBroadcastChannel contains a buffer it _does not_ emit to new subscribers, so it does not do replay.
Would not ConflatedBroadcastChannel work for your use-case?
@skaiver You should avoid GlobalScope and use viewModelScope as shown in Android doc.
Also, experimental coroutines are obsolete since Kotlin 1.3 because now, we have stable coroutines.
About your question it'd be better suited for Kotlin's Slack or StackOverflow as I think it goes beyond the scope of this issue.
BroadcastChannel creates an ArrayBroadcastChannel if you pass 1 or more.
On Thu, Jun 20, 2019, 6:35 PM Tim notifications@github.com wrote:
@skaiver https://github.com/skaiver Вы должны избегать GlobalScopeи
использовать, viewModelScope как показано в документе Android
https://developer.android.com/topic/libraries/architecture/coroutines .Кроме того, экспериментальные сопрограммы устарели после Kotlin 1.3,
потому что теперь у нас есть стабильные сопрограммы.Что касается вашего вопроса, то он лучше подошел бы для Slack или
StackOverflow Kotlin, так как я думаю, что он выходит за рамки этой
проблемы.My question is not related to the implementation of the viewmodel as this
is just an example.
My question is how can I use a ArrayBroadcastChannel in this example
ArrayBroadcastChannel became unavailable—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/Kotlin/kotlinx.coroutines/issues/1082?email_source=notifications&email_token=ABVG6BOOIBHSYNIPUDGUNUDP3OWWPA5CNFSM4HFAKMMKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODYF6MKY#issuecomment-504096299,
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABVG6BO3B5CJC3NRI3VME5TP3OWWPANCNFSM4HFAKMMA
.
Take a look at #1354 please. It is a proposed flow-based analogue to ConflatedBroadcastChannel that makes it unnecessary to implement Flow interface in it. The tentative name is DataFlow, because it _is_ a Flow that is designed to be used as a data model.
Let's bikeshed a name:
DataFlow. The name of the current prototype (#1354). It is a play on the LiveData class from Android and the fact that it is going to be used to represent a "_data_ model" in UI and forms a basis of "_data_ flow" programming model.ValueFlow. Plays on the fact that this flow has value property that can be read to check what is "the current _value_" that is kept by this flow and can be updated.StateFlow. Represents the fact that this flow represents "an application _state_" that can be updated. Being a _state_ it is natural to be interested only in the most recent, conflated, value of it.Note, that the other flow class like this will be most likely called EventFlow and will be designed for "event streams" (like mouse or button clicks) which should not be conflated and where the most recent value does not make any sense. There is not much bikeshedding about its name, since it is quite a good fit. So DataFlow/ValueFlow/StateFlow name should mesh well and be clearly distinct from EventFlow name.
StateFlow is most incisive:
val, implying something unchanging, or constant, which this is not.I don't like StateFlow name, "an application state" looks too domain-specific for universal primitive that may be used for many use cases not only for keeping some state
Data/Value looks fine, don't see a big difference between them.
Flow implies data in an untyped
A bit strange for, why Data is untyped? why is State typed in this case?
@chris-hatton "Value" doesn't imply the value is unchanging.
You can perfectly have a val that always return a different value on each get. It's read-only, which is valid in this case.
Also, it's "ValueFlow", and still makes sense so long you don't drop "Flow" from it.
ValueFlow is great because it can imply there's always a value, and that only the last value makes sense.
The word "Data" in DataFlow doesn't imply singularity so much to me as we could reason about it for all the data coming in the flow, not the latest.
I also voted StateFlow, but I agree with @gildor, apps that will be using shared Flows (#1261) too for state will have a hard time getting distinct names when there's also StateFlows in the file, or codebase.
My two cents: from the current choices, I cast my vote for StateFlow:
IMO other names don't hint about the flow being conflated and the possibility of skipped values. As I see it, we're looking for something that communicates the "only-the-last-value-matters-flow" idea nicely: commonly true for State, somewhat true for Value, not really as much for abstract Data -- we usually want all of.
I'd like to contribute some more options for the sake of brainstorming, not necessarily good but maybe they'll bounce off someone and bring out something else (feel free to boo or delete of course, and sorry for the length):
CurrentFlow -- as in, you're only getting the current something, whatever you missed is not relevant. The obvious disadvantage is it will probably cause a mental collision with "the flow that we're using right now"Current -- even more concise, but probably not having the Flow at the end is a bad idea, plus way too much potential to get mixed in with lots of other stuff from different namespaces. The cool thing is, "flow" and "current" are strongly related words in the context of fluids, but in our case it also has the additional "right now" connotationConflatedFlow -- I don't see it in the comments, I suppose it's not to the point for some reason?SquashFlow -- somewhat to the point IMO, but has a little bit of "flatten" connotation to it (from git squash)ShallowFlow -- as in, "no depth", does hint the "current/last value only" a bitOmniFlow -- as in, "for all", carries the "broadcasted" part of the idea across a little bit, although also suggests "all-encompassing"UniFlow -- deriving from latin "one", commonly perceived as "something single for many somethings" (e.g. universal ~ one fit for many purposes), or may hint "unified" as in "encompassing everything that matters from a multitude of something"Neither DataFlow nor ValueFlow imply that the value is replayed and always available - events are "data" and "values" too. I voted for StateFlow because it is the only one that implies this property for me, even if it's not used to represent "application state" in all cases.
Another two cents: @zach-klippenstein 's comment just made me realize the common thing for the three names that makes them not really ring the bell with me personally: they all impose something on the type of data to represent, while IMO the name should describe the properties of the flow itself without regard to what actual data it's going to handle. E.g the name ConflatedBroadcastChannel only hints on how it works not what it's supposed to work with.
To elaborate: _You see ConflatedBroadcastChannel and you think well "conflated" means only 1 value, "broadcast" means value can be consumed multiple times, all clear. You see "ValueFlow" you think "well, it probably provides values" which doesn't make it stand apart from any other flow. You see "StateFlow" and you think "well, it provides state. What can I imply from 'State'? It's something that's always there and of what there's only one valid instance at any moment -- this is why IMO StateFlow is the best out of the three but still makes you imply something from what's supposed to be an generic parameter_
I vaguely understand why this isn't really a channel (at least in terms of the name), but why is it really a flow?
Sure I can easily see this implementing SendChannel<T> and Flow<T>; but I don't see it as either.
Perhaps a name that is more along the lines of LiveData or ConflatedData would be more appropriate?
You could interpret StateFlow to say it "provides state", but you could also interpret it as "having state" or "being stateful", meaning that it remembers its current state and provides that. That's why that one makes the most sense to me.
@gildor
A bit strange for, why Data is untyped? why is State typed in this case?
A connotation of the word Data is that it's unstructured, raw, lacking context and not informative by itself. State does not specify that it _is_ typed either, but it is simply neutral in this regard, whereas Data is not. See 'Data' on Wikipedia for reference to 'data' not carrying context and being the 'least abstract' form.
I often struggle to communicate about the differences between a "state" (repayable) and an "event" (not repayable), so StateFlow and EventFlow would be a dream come true for me
@ZakTaccardi Repayable or Replayable?
I like StateFlow because it contrasts nicely with EventFlow. I agree with a lot of the other arguments for it above, but I'll expand on my thoughts too.
_States_ and _events_ are familiar terms used together in other areas of computer science, and they line up with my understanding of how proposed classes will work. The _state_ is something that exists at any point in time, whereas _events_ are discrete things that happen at a single point in time. If you observe the _state of the system_, you see the current state and any future ones; if you observe the _events emitted_ then you see any events from this time on.
As an example, this Wikipedia page Finite-state machine uses these words together:
A _state_ is a description of the status of a system that is waiting to execute a transition. A transition is a set of actions to be executed when a condition is fulfilled or when an _event_ is received.
Looking at the words individually, the Wikipedia article on _state_ starts off saying:
..a program is described as stateful if it is designed to remember preceding events or user interactions;[1] _the remembered information is called the state of the system_.
A key point is that state is "remembered". _Data_ and _value_ are more overloaded words with lots of different meanings, and do not seem to me to be as obviously contrasted to _events_. Wikipedia defines _data_ as:
Data ... is any sequence of one or more symbols given meaning by specific act(s) of interpretation.
This is very broad. _value_ is defined as:
a value is the representation of some entity that can be manipulated by a program
but the proposal does not manipulate the value; it emits a new one.
The Wikipedia article Event-driven architecture also uses the terms _event_ and _state_. I know Wikipedia is not the most authoritative source, but it does line up with my understanding and usage of these words.
I agree with keeping StateFlow for the flow that holds the latest value, since it literally represents the latest state.
As per the other one that does not hold the value, Event sounds nice, although I think it would it imply that it's only representing _event_s. How about StreamFlow?. It represents a stream of data and _stream_ does not imply that this flow would hold the data. (the name also goes nicely with flow :) )
I want to recover #274 issue.
Channel provides both read and write interface, BroadcastChannel does not.
Flow provides only read-only interface, which is really good and safe, we should consider to provide a read-only interface for DataFlow (or different name) also.
Here a simple use case with random names, the argumentations are largely discussed with @jcornaz in #274 and related.
interface ValueFlow<T> : Flow<T> {
val value: T
}
interface DataFlow<T> : ValueFlow<T> {
override var value: T
}
class Thermometer {
private val temperatureData = DataFlow<Float>()
val temperatureFlow: ValueFlow<Float> get() = temperatureData
private fun updateLoop() {
temperatureData.value = 42
}
}
I want to recover #274 issue.
Channel provides both read and write interface, BroadcastChannel does not.
Flow provides only read-only interface, which is really good and safe, we should consider to provide a read-only interface for DataFlow (or different name) also.
Here a simple use case, the argumentations are largely discussed with @jcornaz in #274 and related.
Finally I propose to provide a DataFlow to FlowCollector adapter.
@fvasco What's you're use case for your ValueFlow proposal (I didn't see any actual use cases described in the issue you linked)? In my experience, when an API exposes non-reactive access to a reactive stream, it's because some consumer was trying to do something imperatively that should be done reactively with stream operators instead. I'm worried pulling out a ValueFlow interface like you've proposed would encourage code to be written that isn't as reactive as it could be. Not having anything like ValueFlow makes people think twice when they're doing that and often results in a discussion about how to do things more reactively.
Legacy code sometimes needs direct access to current values, but that can already be done with a simple subscribe-and-cache, and the additional friction serves as a good reminder that the old code needs updating.
Hi @zach-klippenstein,
I get the point.
Thermometer should be:
class Thermometer {
private val temperatureData = DataFlow<Float>()
val temperatureFlow: Flow<Float> get() = temperatureData
private fun updateLoop() {
temperatureData.value = 42
}
}
Hi @zach-klippenstein,
I wish to reconsider our recent consideration about Flow and DataFlow usage.
For some use case that solution can be impractical, I think to use DataFlow to store a mutable configuration.
The main Flow's issue is the high cost of first() operation, really much expensive that BroadcastChannel.value.
I currently use a custom implementation of _observable resource_ to get the current value _and_ the further updates.
In some our use case a part of configuration is required to execute a function, so we have to await the first configuration value, in other our use case an update of the same configuration dispatches an event.
I cited issue #274 because we need a builder like receiveBroadcastConflatedChannel { ... }, unfortunately I suspect that this issue does not help our use case.
In some our use case a part of configuration is required to execute a function
Why is it required? This means that code isn't going to be reactive.
@zach-klippenstein you are right.
In this really simple example the code is not reactive
var roles = setOf("admin")
fun checkAccess(role:String){
check(role in roles)
}
However we want to update the roles variable and, for example, fire an event to recheck all sessions.
Still not sure why this needs non-reactive access to the value. If you're firing an event to trigger the checks, you must already be subscribing to the roles flow anyway. So you could do something like:
suspend fun checkAccess(role: String) {
rolesFlow.collect { roles -> check(role in roles) }
}
// and/or
fun checkAccessIn(role: String, sessionScope: CoroutineScope) {
sessionScope.launch { checkAccess(role) }
}
where sessionScope is some scope that is tied to your session lifetime, and so the session will be failed if the check fails.
Hi @zach-klippenstein,
I my hypothetical use case a configuration (role) can be updated in any time in the future, instead checkAccess must check a role using only the _current_ configuration.
checkAccess can check role using rolesFlow.first() only, further calls using the same role can fail or not, depending by further configurations.
rolesFlow is an infinite flow, a collect can loop forever and sessionScope hangs.
I still don't understand what part of your design prevents doing everything reactively, but even if that is a legitimate need, providing non-reactive access to data that can change is often a source of bugs later, because new code isn't forced to deal with the fact that the value can change.
rolesFlow is an infinite flow, a collect can loop forever and sessionScope hangs.
Yes, you'd need to cancel the continuously-running access checking job when the session finishes.
Hi @zach-klippenstein,
I try to write a better example.
I use a stock exchange price for my factory: factoryPriceFlow
val factoryPriceFlow: Flow<Double> = flow { TODO() }
I want to use this flow to update the current price on user interface:
factoryPriceFlow
.onEach { currentPrice -> updatePrice(currentPrice) }
.launchIn(myScope)
Moreove I want to implement an action to buy a stock using _the current price_
suspend fun buyStocks(quantity: Int) {
val currentPrice = factoryPriceFlow.first() // get current value
newCreditCardTransaction(currentPrice * quantity)
fetchStock(quantity)
}
buyStocks is not reactive, do you have any suggestion regarding it?
I had considered a new operator to get the latest value, but I don't have any valid proposal to build an unscoped version of it.
val latestFactoryPrice by factoryPriceFlow.latest()
I think we're getting a bit into the weeds here about general architecture so maybe we should move this convo to Slack? But that makes sense.
If buyStocks is not a static/top-level/object function, then it already has an implicit scope (in the abstract sense, not CoroutineScope), even if that scope is "the lifetime of your app". If you make that scope _explicit_, either tied directly to a Job or with simple start/stop methods tied to a private Job/CoroutineScope, then you can implement your latest delegate (taking the scope as a parameter) or just manually cache the latest value in a private property.
Another approach would be to make your buyStocks function just another flow, and let whoever is invoking it worry about the correct scoping:
typealias BuyStocksAction = (quantity: Int) -> Unit
val buyStockActions: Flow<BuyStocksAction> =
factoryPriceFlow.map { currentPrice ->
{ quantity -> buyStocksAtPrice(quantity, currentPrice) }
}
private fun buyStocksAtPrice(quantity: Int, currentPrice: Double) {
…
}
// Then in your UI layer, if you're using something like RxBindings, and #1498 existed:
val buyActions = quantityView.values.withLatestFrom(stockManager.buyStockActions) { quantity, action ->
{ action(quantity) }
}
purchaseButton.clicks.withLatestFrom(buyActions) { _, buyAction -> buyAction() }
.launchIn(viewScope)
But both of these approaches might seem like a lot of extra boilerplate just to avoid exposing the current price as a simple property. Are they actually better? I would argue yes, because the concept of "factory price" is inherently reactive, so the API should reflect that. The need to sample the reactive stream at a specific instant in time is really only a consequence of the view framework you're working in not being inherently reactive (e.g. Jetpack Compose will likely make this a lot nicer). And because that is a requirement of the view layer, that's the layer that should be responsible for bridging reactive and non-reactive code.
Zooming back out from the specifics of this example, another reason that exposing "current value" as part of your API isn't ideal is composability. Smaller API surfaces are easier to compose together than larger ones. If your consumers start depending on your current value directly, then it is harder to add intermediate components between them later. Those components would need to continue to expose both a reactive stream and their current value, so there are now two parallel paths through which data is flowing (through the stream and through the chain of current values). This is just more code to write, but also means you need more tests to cover both code paths.
Once you've started designing your code to be reactive first, it's much simpler to keep _everything_ reactive until you absolutely have to – which is typically at the view layer.
Hi @zach-klippenstein,
my issue with the scope occurs when latestFactoryPrice is an instance property, so the scope is the instance lifetime.
If you make that scope explicit
...then I am forced to implement a dispose method in my class and in any classes where it is used, I consider this solution inconvenient in a _garbage collected language_.
If a developer forgets to dispose an instance then his creates a memory leak, moreover that variable will update forever.
just manually cache the latest value in a private property
this is a workaround on library miss, we are discussing on it, moreover it is not possible to collect a flow without a scope.
Implementing a @Volatile, private property is not a simple boilerplate for developers (really like a lazy variable) and it is an extra CPU work anywhere is defined (even it is not used).
Another approach would be to make your buyStocks function just another flow
So I should get the first() value of buyStockActions to get an instance of buyStocks _with currying_, I really don't understand how this suggestion solve/simplify my issue.
val buyStocks = buyStockActions.first()
buyStocks(10)
I propose you a simple exercise, please help me to understand.
The current flow send price's updates:
val factoryPriceFlow: Flow<Double> = flow {
while (true) {
emit(1.0)
delay(100)
emit(2.0)
delay(200)
}
}
I have to implement a simple program to print the current price, like a REST service or a trivial main function, please complete the code
fun main() {
val currentPrice: Double = TODO()
println("The current price is $currentPrice")
}
It should print "The current price is 1.0".
Can you fix/rewrite my code?
Thank you in advance.
I rethink about above checkAccess problem (required by some legacy framework, for example).
first() is a suspending method, I didn't find no way to respond quickly if configuration is not already present.
Using the current API the only proposal is:
val rolesFlow: Flow<Set<String>> = TODO()
override fun checkAccess(role: String) {
val roles =
runBlocking {
withTimeoutOrNull(100) {
rolesFlow.first()
}
} ?: emptySet()
check(role in roles)
}
I have to consider that a Flow is inadequate in this use case, instead ConflatedBroadcastChannel works well (value, valueOrNull) but it misses of read-only interface (#274).
@zach-klippenstein your previous post is reasonable, however if we have to expose the same value using WebSocket (reactive) and HTTP long polling (not reactive) then there is not much choices.
I hope that my comments help us to prepare a reasonable response for these use cases.
I have a use case for BroadcastChannel with capacity UNLIMITED.
The use case is related to RecyclerView and handling View events on it's children, and I'll be using the example of a View.OnClickListener.
RecyclerView uses the ViewHolder pattern for rendering it's children; it creates a View on demand, and then "caches" it, and reuses it when that View is scrolled off the screen. When it is reused, a method is invoked that gives you a chance to bind your data to that View. One of the benefits of this is increased performance since we don't need to allocate a new View for each child every time the RecyclerView is scrolled, and we don't need to have a 1:1 of Views to children (because that could get memory intensive very quickly).
A best practice while doing this is to limit the amount of allocation that occurs during a bind, and prefer to do it when the View is getting created (or not even allocate then if possible). This often comes into play with setting OnClickListener on a View.
Another issue is that it is difficult to easily pass these events up to a Presenter (or other architecture component) without a lot of manual wiring because of how RecyclerView works.
My solution uses a global OnClickListener. The back end of it looks like this:
/*
Note: specifically not a data class; we want an identity hashCode()
*/
class ListEvent<T>(
private val capacity: Int = 2
) {
internal data class Payload(
val listenerId: Int,
val payload: Any
)
internal val hashCode by lazy { hashCode() }
fun CoroutineScope.clicks() = produce<T>(capacity = capacity) {
forwardEvents(GlobalClickChannel, channel)
}
}
internal val GlobalClickListener = View.OnClickListener { view ->
GlobalScope.launch(Dispatchers.Main) {
GlobalClickChannel.send(
ListEvent.Payload(
listenerId = view.getTag(R.id.list_model_id) as Int,
payload = view.getTag(R.id.list_model_on_click)
)
)
}
}
internal val GlobalClickChannel = BroadcastChannel<ListEvent.Payload>(BUFFERED)
private suspend fun <T> TwentyListEvent<T>.forwardEvents(
globalChannel: BroadcastChannel<ListEvent.Payload>,
sendChannel: SendChannel<T>
) {
val receiveChannel = globalChannel.openSubscription()
for((listenerId, payload) in receiveChannel) {
if(listenerId == hashCode) {
@Suppress("UNCHECKED_CAST")
(payload as? T)?.let { typedPayload ->
sendChannel.send(typedPayload)
}
}
}
}
When I want to handle a click from a RecyclerView child, in my Presenter I maintain a reference to a ListEvent that was passed to the RecyclerView and is accessible in the child binding function. I can then set the payload, and the global OnClickListener for the view without having to make an allocation:
protected fun <T> View.setListClickListener(listener: ListEvent<T>, payload: T) {
setTag(R.id.list_model_on_click, payload)
setTag(R.id.list_model_id, listener.hashCode)
setOnClickListener(GlobalClickListener)
}
protected fun View.clearListClickListener() {
setTag(R.id.list_model_on_click, null)
setTag(R.id.list_model_id, null)
setOnClickListener(null)
}
I can consume the clicks from my Presenter by simply:
private fun CoroutineScope.observeVenueClicks() {
launch {
val clickEvents = with(clickListEvent) { clicks() }
for(event in clicks) {
...
}
}
}
The problem is, that if there's a misbehaving consumer that doesn't properly consume the events from ListEvent.clicks() (a bug, or it gets blocked, etc...), then the whole system will deadlock, when GlobalClickChannel's buffer is filled. Even if a new consumer comes later and opens a new subscription in forwardEvents it will not be able to drain GlobalClickChannel since it's not conflated. I could keep raising the buffer size, but that only delays the problem.
If I could use UNLIMITED then I would never get to a point where the whole system is deadlocked. I tried using PublishSubject but that has a similar problem, since it will wait for all the collectors, and will also be affected by a misbehaving consumer.
Will DataFlow be able to handle this scenario?
After much debugging, it turns out that my issue was that I was receiving from the BroadcastChannel's ReceiveChannel using a for loop. Using consumeEach seems to have fixed the problem.
I think that my above points still stand, even though it wasn't my current issue.
@eygraber Unfortunately, I don't know a good solution to the problem you've described. That is, if you have potentially misbehaving listeners then you have to choose between:
OutOfMemoryError. Arguably, the latter option might be preferred for mobile apps that would not live long enough to get their memory exhausted, so it does make sense to support unlimited-size buffers, indeed.
Is not allowing UNLIMITED already set in stone, or could it still be implemented? Similarly, is BroadcastChannel for sure on track to get deprecated?
We do plan to provide some sort of unlimited-buffered primitive of this kind.
I have seen the DataFlow API is marked as draft, is it going to be available soon or I should use ConflatedBroadcastChannels?
A correction here, despite the fact that
ArrayBroadcastChannelcontains a buffer it _does not_ emit to new subscribers, so it does not do replay.Would not
ConflatedBroadcastChannelwork for your use-case?
ConflatedBroadcastChannel re-emits the last sent value when I use it. BroadcastChannel(1) does not.
Thanks a lot for a fruitful discussion in this issue and tons of useful feedback. This issue was originally created to "Consider making BroadcastChannels to implement Flow" and as a result of this feedback and discussions the decision is not to implement Flow in BroadcastChannel interface, but instead provide better-designed Flow-based replacement for the use-case that are currently covered by broadcast channels. The replacements will be tracked by separate issues:
ConflatedBroadcastChannel -> StateFlow; please, welcome issue #1973. ArrayBroadcastChannel -> TBD ~ EventFlow; for now issue #1901 will be used for further discussion until an issue with a concrete design is created. I'm closing this one.
Most helpful comment
Let's bikeshed a name:
DataFlow. The name of the current prototype (#1354). It is a play on theLiveDataclass from Android and the fact that it is going to be used to represent a "_data_ model" in UI and forms a basis of "_data_ flow" programming model.ValueFlow. Plays on the fact that this flow hasvalueproperty that can be read to check what is "the current _value_" that is kept by this flow and can be updated.StateFlow. Represents the fact that this flow represents "an application _state_" that can be updated. Being a _state_ it is natural to be interested only in the most recent, conflated, value of it.Note, that the other flow class like this will be most likely called
EventFlowand will be designed for "event streams" (like mouse or button clicks) which should not be conflated and where the most recent value does not make any sense. There is not much bikeshedding about its name, since it is quite a good fit. SoDataFlow/ValueFlow/StateFlowname should mesh well and be clearly distinct fromEventFlowname.