Kotlinx.coroutines: Broadcast channel

Created on 5 Apr 2017  路  13Comments  路  Source: Kotlin/kotlinx.coroutines

It was considered to include a _broadcaster_?

This should be an inverse of "select", each broadcast message will sent to all registered channel, like an asynchronous event bus.

Most helpful comment

@fvasco Thanks for feedback on your use-case. I totally agree that ValueBroadcastChannel does not cover event-bass use-think. It is more appropriate for UI state management (and you don't have to explicitly yield there, because you'll yield UI thread implicitly when you finish handling UI event).

I plan to write a separate implementation (working name ArrayBroadcastChannel) to cover event-bus use-cases -- subscribers will start receiving all events (no conflation) from the time they subscribe and onwards.

All 13 comments

It is a challenging call. One one side, it is quite a common pattern, so there is definitely a benefit to include it. On the other side, there are the following considerations _against_ it:

  • it is relatively straightforward to write one on top of regular channels and actors
  • there are no well-defined _universal_ semantics to broadcast channel that are equally useful for all applications (key questions are what happens with late subscribers and how slow subscribers affect or don't affect fast ones). The actual behaviour strongly depends on minute details of a _particular_ use-case for broadcast channels that you have in mind.

Btw, what is your _particular_ use-case?

Hi, thanks for quick reply.

I use an event bus to decouple modules, so I need to send the same message to multiple unknown actors.
I tried RxJava for this purpose, it is really comfortable but too difficult to understand for this use case.
Now I'm using Guava EventBus and works well but require reflection and miss of map/reduce operators.

it is relatively straightforward to write one on top of regular channels and actors

IMHO this statement doesn't look a real cons.

there are no well-defined universal semantics to broadcast channel that are equally useful for all applications

I agree but I am ready to examine in depth these problems.

key questions are what happens with late subscribers and how slow subscribers affect or don't affect fast ones

Like a regular Channel the BroadcastChannel may implement SendChannel and define a own capacity. This is a simple option but I am not sure can be the right one, I think that this problem isn't so straightforward.

What I suggest to do is to write your implementation of Broadcast channel based on your needs as a separate mini-project (it will actually just take one file with a few pages of Kotlin code in your project) and then you can offer to contribute it to kotlinx.corotuines and we'll examine how reusable it turns out to be for other people needs to make a decision. You can read this part from the Kotlin Slack archive for inspiration: https://kotlinlang.slackarchive.io/coroutines/page-29/ts-1489592900143866

I've implement ValueBroadcastChannel that does basically the same thing as Rx BehaviorSubject. See this commit: https://github.com/Kotlin/kotlinx.coroutines/commit/4b0ef7baec0ec04943673dccfec5cbc6588d4ba5

Also, there is a new section in the guide explaining it: https://github.com/Kotlin/kotlinx.coroutines/blob/develop/reactive/coroutines-guide-reactive.md#rx-subject-vs-broadcastchannel

I believe that covers some use-cases that people encounter. @fvasco Does it cover yours?

Hi @elizarov,
thanks for your effort. It is a great job, really better than what I have in my mind.
I'm sorry but I stopped my work for an unstable API, but your looks a better point of view.

ValueBroadcastChannel is a good utility and it fits really well in another part of my work.

Unfortunately ValueBroadcastChannel has some issue for a event-bus use case:

  • You must call yield for every submission
  • Submissions are always conflated, but no events should be lost so the size of a event bus channel is UNLIMITED or 0
  • Last message in a bus is not relevant.

The idea behind is the possibility to broadcast the same message to multiple actors, on other side an actor can consume events from many channels.
So using the for statement to fetch element is useful, but I consider equally useful an utility to connect a BroadcastChannel to an ActorChannel.

PS: a connect operator can be useful also for _convert_ an unlimited channel to a conflated channel or vice versa.

@fvasco Thanks for feedback on your use-case. I totally agree that ValueBroadcastChannel does not cover event-bass use-think. It is more appropriate for UI state management (and you don't have to explicitly yield there, because you'll yield UI thread implicitly when you finish handling UI event).

I plan to write a separate implementation (working name ArrayBroadcastChannel) to cover event-bus use-cases -- subscribers will start receiving all events (no conflation) from the time they subscribe and onwards.

@fvasco I've pushed implementation of ArrayBroadcastChannel to develop branch. It covers your event-bus use-case, I think.

Hi,
I read the documentation: it looks a great job.

I want to expose a some considerations.

1.
Channel interface has Channel factory, instead BroadcastChannel interface has a specific hierarchy: this looks disomogeneous.
Maybe a nice solution is to create a BroadcastChannel interface using the same capacity values accepted by Channel factory.

2.
Make Channel implementations (RendezvousChannel, LinkedListChannel, ConflatedChannel, ArrayChannel) "public open" (if possible) can lead to an easy specialization.

3.
BroadcastChannel has open and close method, unfortunately close closes the channel but open create a new subscription to the channel.
In my opinion open can lead to confusion, a name like subscribe may be clearer.

  1. Makes sense. Will do.
  2. All the Channel implementations are _already_ open. There is not much you can customize there, though, but there are a couple of protected functions specifically designed for override (afterClose and onEnqueuedReceive)
  3. That is a tough call. subscribe calls for unsubscribe pair, but it has to be named close for conformance to Closable (which is, in turn, needed for interoperability with use stdlib function).

Maybe we can rename the whole concept of "closing" a channel, since it is confusing some users (see http://stackoverflow.com/questions/43889066/what-closing-a-kotlinx-coroutines-channel-does) however, the concept of "closing" the channel is very well known in Go/CSP world and renaming it may backfire.

3.

In my opinion close method is the right name for bot Channel and Subscription.
java.util.stream.BaseStream and java.io.OutputStream has the close() method, so this look as accettable choice; same for subscription.close().

My concern is about the open method to make subscriptions becouse channel.open doesn't refer to subscriptions; moreover a new Channel is already "open" becouse it is possible to call the close method.

A clearer name for this method may be openSubscription, or simply subscribe, like Rx Observable or Flowable.

Tip: integrate Channel's documentation with your stackoverflow answer.

Unfortunately, we cannot just call it subscribe. The idea was to have a single operation that opens a new channel subscription both for Rx extensions and for BroadcastChannel. However, the subscribe name was already taken in Rx world, hence the name open was born as the opposite of close. However, your naming concern is valid. It does not open a BroadcastChannel itself, but opens a subscription to it, so openSubscription may be a more appropriate name.

I've finally renamed open to openSubscription in develop branch, fixed one more bug with ArrayBroadcastChannel, and so I consider this issue to be closed.

Fixed naming in version 0.17

Was this page helpful?
0 / 5 - 0 ratings

Related issues

elizarov picture elizarov  路  143Comments

altavir picture altavir  路  44Comments

elizarov picture elizarov  路  40Comments

elizarov picture elizarov  路  116Comments

qwwdfsad picture qwwdfsad  路  47Comments