I noticed a few scenarios with channels where they do not seem to be fair and are filling their buffer in the wrong order. The main gist of the post can be seen from the first example, while the additional examples provide alternate scenarios with more unexpected outputs.
suspend fun sendString(channel: SendChannel<String>, s: String) {
while (true) {
channel.send(s)
}
}
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo") }
launch(coroutineContext) { sendString(channel, "BAR!") }
repeat(6) { // receive first six
println(channel.receive())
}
}
Outputs:
foo
BAR!
foo
foo
BAR!
If the channel is given a buffer, the buffer is then filled with as many foo's as it can hold, and then a single BAR! which causes an output of the following when the channel has a buffer of 4:
foo
foo
foo
foo
foo
The buffer is always filled to its max capacity with "foo" and only gets a "BAR!" afterwards at which point the send("BAR!") is suspended until the multiple foo's in the buffer are drained.
Filling the buffer like this also has some other weird results when a delay is added either before or after channel.receive() is called, as seen below:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>(2)
launch(coroutineContext) { sendString(channel, "foo") }
launch(coroutineContext) { sendString(channel, "BAR!") }
repeat(6) { // receive first six
delay(1L)
println(channel.receive())
}
}
Outputs:
BAR!
foo
BAR!
foo
BAR!
Above is what I anticipated the original code to output since that seems more like the channel is acting fair.
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo") }
launch(coroutineContext) { sendString(channel, "BAR!") }
repeat(6) { // receive first six
println(channel.receive())
delay(1L)
}
}
Outputs:
foo
BAR!
foo
BAR!
foo
From the code above and through some debugging, send("foo") is getting called, then it is getting to send("foo") a second time at which point it suspends, then it goes to send("BAR!") and suspends which causes the next strings in the buffer to be ordered such that "foo" is before "BAR!" for the first two items emitted.
And the last scenario:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch(coroutineContext) { sendString(channel, "foo") }
launch(coroutineContext) { sendString(channel, "BAR!") }
delay(1L)
repeat(6) { // receive first six
println(channel.receive())
}
}
Outputs:
BAR!
foo
foo
BAR!
foo
This case was the weirdest as the execution of the while loop and the suspensions from send in sendString were in a very weird order that caused two "foo"'s to output in position 2 and 3.
Sorry for the long post, I noticed this while playing around with coroutines and did not think it was intended so I thought I would post here. If this is intended or there is a silly I'm making, any explanation is appreciated. Thanks.
This is a side effect of the fact that all the coroutines run under the same context in the same thread. Let me provide a step by step explanation of the first scenario (with unbuffered rendezvous channel):
launch (let's call them "foo" coroutine and "bar" coroutine). The resulting "foo", "foo", "bar, "foo", "foo", "bar", etc does look counter-intivive, though. I'll keep this issue open for two reasons:
Thanks for the detailed walkthrough, output makes a lot more sense, also helped in figuring out how the delay(1L)'s were affecting it.
Maybe yield is more appropriate than delay.
One way to solve the issue of fairness and to make this behavior more intuitive is to change the logic (and specification) of send to make it _always_ suspend on invocation. It already suspends when it cannot send (no rendezvous/buffer full), but it does not suspend otherwise. It can yield in the latter case. With this change, the execution in the topic-starter example will become _fair_. I think we can add an optional fair boolean parameter to channel constructors to enable this behavior.
The fair channels would also solve an issue of cancellability some people are facing with channels. The issue is that suspending invocations are cancellable only when they do suspend, so when you are continuously sending into the channel it is somewhat based on your luck (esp. in multi-thread scenarios) whether the sender can be cancelled. A send on a fair channel will always suspend (even if just to yield), so it will be always cancellable. This leads me think that we might want to have fair channels by default, while leaving an option of using non-fair channels as a performance optimization.
Is it "safe" to support unfair channel?
Unlimited/Conflated channel should be always fair.
Maybe this decision should be motivated, if performance gain of unfair channel is negligible then I don't find any reason to supporting it.
I'm going to find out the performance difference between fair/unfair channels before making this decision. It is also related to #113, since the performance difference should be more pronounced with SPSC channels.
A little case study on Unlimited/Conflated Channel.
The following case is valid on single thread, but it is acceptable for the ForkJoinPool on a loaded machine.
Here an example code:
fun main(vararg args: String) {
val channel = Channel<Int>(Channel.UNLIMITED) // CONFLATED
val context = newSingleThreadContext("test")
// producer
launch(context) {
var i = 0
while (true) {
channel.send(i++)
// yield()
}
}
// consumer
launch(context) {
channel.consumeEach { println(it) }
}
Thread.sleep(15000)
}
As explained above this program doesn't print anything, yield fixes this issue.
Moreover producing a large block of data without consuming leads some issue on GC and CPU cache (expecialy for unlimited channel).
I'm going to find out the performance difference between fair/unfair channels before making this decision.
@elizarov since there were no posts here for quite some time: is there any updates on the before mentioned change in the behaviour? Is it planned to be implemented before experimental API and specification become stable?
No update so far. We've been thinking about fairness of the channels, though, and here are some thoughts:
So, the current thinking that that we should make channels "partially fair" -- yield periodically (say on every 32 sends of something like that). It is not easy to add to the current design of channel, but as we work on a different channel implementation that feature can be quite easily incorporated at little additional cost.
@elizarov are there any updates? Current suggestion to overcome this issue is to add yield before any send invocation (for ex. as extension method) or may be there is other more suitable way?
yield() is still the way to go. We use it in our code, and in the code of some Flow operators, too.
Most helpful comment
This is a side effect of the fact that all the coroutines run under the same context in the same thread. Let me provide a step by step explanation of the first scenario (with unbuffered rendezvous channel):
launch(let's call them "foo" coroutine and "bar" coroutine).The resulting "foo", "foo", "bar, "foo", "foo", "bar", etc does look counter-intivive, though. I'll keep this issue open for two reasons: