Kotlinx.coroutines: Feature - Priority Channel

Created on 12 Feb 2018  路  10Comments  路  Source: Kotlin/kotlinx.coroutines

It would great to have a priority based channel, such that I can use Comparable to make one object of a higher priority than another such as PriorityQueue. I was looking at this example.. and it would be trivial but of course this is not the core.
https://github.com/Kotlin/kotlin-coroutines/blob/master/examples/channel/channel.kt

enhancement use-case needed

Most helpful comment

For completeness I wrote this test to see how it would potentially work.

    @Test
    fun testSelectPriorityChannel() = runBlocking {

        val rnd = Random()
        val channels: List<Channel<String>> = (0..5).map { Channel<String>(2) }

        // should be infinite stream
        val j = launch {
            repeat(20) {
                val priority = rnd.nextInt(5)
                channels[priority].send("string $it priority: $priority")
            }
        }

        val j2 = launch {
            repeat(20) {
                println("Stuff --> " + receive(channels))
            }
        }

        j.join()
        j2.join()

        channels.forEach({c -> c.close()})
    }

    suspend fun <T> receive(subChannels: List<Channel<T>>): T = select {
        for (subChannel in subChannels) {
            subChannel.onReceive { it }
        }
    }

All 10 comments

@elizarov any questions, comments, concerns?

So I tried to port this to my own code base however because this Symbol is marked _internal_ I can't.. easily that is..

internal val ALREADY_SELECTED: Any = Symbol("ALREADY_SELECTED")

Can you, please, elaborate on the use-case for such a priority channel? Do you any specific usage example in mind?

I'm also referencing PR #238 here.

The specific use case is I've got a high performance application that is sending messages to the channel, some messages are much more important than others. In our case some are human driven so they expect lower latency aka higher priority. The other messages can have higher latency.

The messages in the channel are all the same type and just data.. but due to the fact that some are human driven there's an expectation even under load that latency or priority for those messages be elevated because there's a person waiting on the response.

Basically the reason for this is the channel pattern is great I want to have a workers on the channel reading data doing work, but I want the workers doing the high priority stuff first. The work is all the same so it doesn't make sense to design around a pattern that's pretty much the same as PriorityQueue.

The problem with the proposed implementation is that it is using a PriorityQueue that would loose FIFO order for messages at the same priority. It can be fixed. However, I would recommend to consider a totally different approach to your use-case. Instead of designing a special channel based on a priority queue use a separate channel for each priority instead. You can easily use select { ... } expression to receive from multiple channels (higher-priority goes first in the list) and this way you get your use-case covered without having to implement a separate channel kind. Moreover, you will not have to make all your messages Comparable, too.

Do you have an example? I struggling to see how that's better.. do I have more coroutines on the higher priority channels and rank them? The select statement is just routing.. FIFO on the same priority is not an issue for me.. Maybe your talking about having the select {} do it from the higher and then lower and lower and lower.. Anyway if you don't think its cool that's fine but could you at least make it so I can leverage the great work you guys have already done in the AbstractChannel

Hi @wdroste,
a prototype.

abstract class PriorityChannel<T>(protected val subschannels: List<Channel<T>>) : Channel<T> {
    /**
     * Send the specified [item] to a single subschannel
     */
    protected abstract suspend fun dispatch(item: T)

    override suspend fun receive(): T = select {
        for (subchannel in subschannels) {
            subchannel.onReceive { it }
        }
    }

    // many other methods...
}

In such case the order of subchannels is important (the first is priority and so on).

@fvasco and @elizarov I'll close as I understand how you've accomplished the use case and its fairly elegant.. I appreciate your time and attention to this project so thank you.

Please correct me if I'm wrong but it seems the proposed solution above will be less efficient. Since it must receive across many channels and use a select cause to determine which coroutine is available.. In addition there's probably a lock in everyone of those sub-channels (best that each one be a buffer) that must be observed.

For completeness I wrote this test to see how it would potentially work.

    @Test
    fun testSelectPriorityChannel() = runBlocking {

        val rnd = Random()
        val channels: List<Channel<String>> = (0..5).map { Channel<String>(2) }

        // should be infinite stream
        val j = launch {
            repeat(20) {
                val priority = rnd.nextInt(5)
                channels[priority].send("string $it priority: $priority")
            }
        }

        val j2 = launch {
            repeat(20) {
                println("Stuff --> " + receive(channels))
            }
        }

        j.join()
        j2.join()

        channels.forEach({c -> c.close()})
    }

    suspend fun <T> receive(subChannels: List<Channel<T>>): T = select {
        for (subChannel in subChannels) {
            subChannel.onReceive { it }
        }
    }

Just for those who still want a priority channel implementation: there is one in the kroki library https://github.com/kerubistan/kroki/blob/master/kroki-coroutines/src/main/kotlin/io/github/kerubistan/kroki/coroutines/Channels.kt

Was this page helpful?
0 / 5 - 0 ratings