We need a broadcast operator that turns a ReceiveChannel into BroadcastChannel.
Should the broadcast operator directly start to send elements? Or should it wait for the call on a method, like connect of the Rx publish operator?
We are talking about hot channels here, so it makes sense to start sending them immediately. It would be an interesting design question for broadcast() (and for produce()) operators on cold streams (see #254), but I would also argue that for cold channels, too, the very invocation of broadcast() is a signal that you want to "meterialize" the source (start running it).
I agree, with hot channel there is no point for not starting immediately.
But for cold channel, I think it is more arguable. Even if I would be ok to start immediately, I think some user could find useful to at least decide how it behave depending on the use-case. With Rx for example the user may use connect, autoConnect, refCount and share to manage it.
However, may I ask what you would think about the following implementation?
fun <E> ReceiveChannel<E>.broadcast(capacity: Int): BroadcastChannel<E> {
val broadcast = BroadcastChannel<E>(capacity)
val job = launch(Unconfined) {
try {
consumeEach { broadcast.send(it) }
} finally {
broadcast.close()
}
}
return object : BroadcastChannel<E> by broadcast {
override fun close(cause: Throwable?): Boolean {
job.cancel(cause)
return broadcast.close(cause)
}
}
}
@jcornaz you should propagate close cause to broadcast.close, that is add catch(e: Throwable) before finally to remember the cause.
It also seems better to also implement a broadcast { ... } builder for general use that performs this binding of broadcast channel to a job, and then implement .broadcast() operator via that broadcast { ... } builder in a straightforward way.
And it will good for now.
For a future, though, note that all current builders (like produce and actor) return a new object that delegates all the channel methods to the underlying channel (like by broadcast in your implementation). This is a suboptimal solution. Actually, if we introduce a linked job in every channel implementation (see discussion in #260), then channels returned by produce, actor, and broadcast will be more efficient as they will not require any delegation (because every channel will natively support a linked job).
I'll do it myself.
@sdeleuze Can you, please, elaborate a bit on your use-cases. I'm slightly worried that existing implementations of broadcast channel may not cover your needs, so we may need to add some new ones. In particular, I cannot figure out what kind of BroadcastChannel we shall create in broadcast() by default.
What kind of behavior is desired? When should happen to the produced items before the first invocation of openSubscription? Shall they be dropped or buffered? If buffered, then up to what limit? If they are buffered, then what should happen after the first openSubscription? Shall they be dropped or buffered (basically forever) for all the subsequent subscribers?
I cannot figure out what kind of BroadcastChannel we shall create in broadcast() by default
Couldn't it be a capacity argument, capable of accepting Channel.CONFLATED for example?
The use case is to multicast elements from a hot stream to multiple clients, for example we can imagine a Coroutine based HTTP client requesting a remote JSON streaming endpoint on server-side and broadcasting to 100+ SSE endpoints, that's what we do with Flux + share(). No need to buffer, items could be lost for late subscribers.
@sdeleuze Thanks. That is the same conclusion I came to -- mimic share by default. So, by default broadcast() is going to use CoroutineStart.LAZY and start only when the first subscriber comes in. That guarantees that first subscriber never looses elements. Late subscribers may loose them, though.
The tradeoff in this behavior, is that the producer of the stream will be suspended (backpressure) until the first subscriber appears. I really hope we are choosing the right default. Maybe we should have no default at all, but always require an explicit specification of what to do.
Why not just have .share() insead of .broadcast() for that..., it's not top level, and it mimicks existing rx behavior, so there will be less confusion... Then the top level broadcast would be like a ConnectableObservable, where connect == openSubscription... it's a big advantage to have people able to reuse knowledge from other frameworks rather than just having to learn another one and those used to existing ones could get confused with conflicting behaviors of similarly working components. It also eases documenting to have existing comparisons...
@dave08 We _still_ have to figure what share and/or broadcast does by default regardless of how we name it. We can have different methods with different behaviour, but it does not make it any simpler. How would I remember which one does what?
Since we are talking about hot streams here, it does matter what we do. It is not so much of a concern for cold, Rx-style streams.
Let me clarify why I think "lazy start, suspend producer" is a good default. It is a good default because it is consistent with all the other channel operations we have. If you write:
val dest = source.filter { blah-blah }
then the source is generally suspended until you start consuming dest (because of backpressure propagation). It seems logical to have the same behavior (by default) with broadcast.
I see your point, I just felt that the learning curve of the current implementation is bigger since the concepts are mostly new ones, with little correlation with existing popular frameworks.
It shouldn't have to be all the same, just similar enough, whenever possible, so as not to not have to relearn everything.
We're still switching from rx because of all the advantages of working with coroutines, but I think that the general concepts and the way they work and adapting them to coroutines as much as it makes sense, would make things much easier to learn and to build on, and would use battle tested patterns that currently deal with a wider range of use cases
Correct me if I'm wrong, but it is my understanding that coroutine channels themselves are not attempting to comply with the reactive-streams initiative but do provide a toolkit for which a reactive-stream implementation would greatly benefit. Although many of the concepts are somewhat aligned, the implementations aren't attempting to mirror each other and an abstraction on top of channels would be the best place to offer reactive-stream compliant functionality for ease of use by devs already familiar with Rx, etc.
@caleb-allen Channels are quite an orthogonal concept to reactive streams. Channels come from CSP (Communicating Sequential Processes) world. In that worlds channels are things that you are supposed to share between communicating processes. Channels are like queues -- send at one and and receive on the other end. They are very much unlike reactive streams in many respects.
However, when we start expanding the repertoire of channels that we support (like broadcast channels discussed here) we inevitably somewhat encroach onto the territory of reactive streams which sometimes results in confusion.
Channels are not something that reactive streams might readily benefit from, but Kotlin coroutines and suspending functions in particular is something to watch for. See, reactive streams specification is quite involved precisely because it is designed to work around the lack of suspending functions in Java. Reactive streams is a clever hack to support asynchronous data streams in a language that does not support asynchrony natively. That is why Kotlin's only interest in reactive streams is from interop standpoint. We have all sorts of adapters with reactive streams, but there is no much benefit in using them in Kotlin as is.
Ah, that makes sense! Still wrapping my head around how it all fits (or does not fit) together.
So if I'm understanding this correctly, what reactive streams attempts to achieve (asyncronous streams following the reactive pattern) for Java is something that is inherent in Kotlin's design because of coroutines? And that's why it may be useless to try to implement reactive-streams for kotlin, because it would be be bypassing a core language feature. Do I have that right?
@caleb-allen For asynchronous functions returning a single result we have a robust language feature (suspending functions). For asynchronous streams of values we still need something in the library. Given that asynchronous functions are natively supported in the language, this library's design will be necessary different form reactive streams (see #254).
I want broadcast() to have ConflatedBroadcastChannel- like behavior, where latest emission is replayed to new subscribers. Is this possible?
Most helpful comment
@caleb-allen Channels are quite an orthogonal concept to reactive streams. Channels come from CSP (Communicating Sequential Processes) world. In that worlds channels are things that you are supposed to share between communicating processes. Channels are like queues -- send at one and and receive on the other end. They are very much unlike reactive streams in many respects.
However, when we start expanding the repertoire of channels that we support (like broadcast channels discussed here) we inevitably somewhat encroach onto the territory of reactive streams which sometimes results in confusion.
Channels are not something that reactive streams might readily benefit from, but Kotlin coroutines and suspending functions in particular is something to watch for. See, reactive streams specification is quite involved precisely because it is designed to work around the lack of suspending functions in Java. Reactive streams is a clever hack to support asynchronous data streams in a language that does not support asynchrony natively. That is why Kotlin's only interest in reactive streams is from interop standpoint. We have all sorts of adapters with reactive streams, but there is no much benefit in using them in Kotlin as is.