Kotlinx.coroutines: Buffered Channels not providing all values for consumption

Created on 12 Feb 2018  路  4Comments  路  Source: Kotlin/kotlinx.coroutines

Hello,
Using a buffered channel and a pipeline pattern, not all the messages in the buffer are being consumed as expected. Using the code:

fun main(args: Array<String>) = runBlocking<Unit> {

    val p1 = myProducer()
    val c1 = myConsumer(p1)
    c1.join()
}

fun myProducer() = produce<Int> (capacity=10)  {
    println("Producer1 is working in thread ${Thread.currentThread().name}")
    for (i in 1..10)
    {
        println("Sending " + i.toString())
        send(i)
        delay(300L)
    }
    channel.close()
}

fun myConsumer(channel: ReceiveChannel<Int>) = async {
    println("Consumer is working in thread ${Thread.currentThread().name}")
    channel.consumeEach {
        println("Processing " + it.toString())
        delay(3000L)
    }
}

Produces the following output:

Producer1 is working in thread ForkJoinPool.commonPool-worker-1
Sending 1
Consumer is working in thread ForkJoinPool.commonPool-worker-2
Processing 1
Sending 2
Sending 3
Sending 4
Sending 5
Sending 6
Sending 7
Sending 8
Sending 9
Sending 10
Processing 2

Process finished with exit code 0

Only the first two messages in the channel are getting processed before the program exits, when there should be 10 values in the channel. From reading the documentation, this was not the expected behaviour.

Most helpful comment

In comparison, when the channel is created outside the producer and passed in, the behaviour is as-expected. It looks like the producer builder isn't holding an ArrayChannel open (or LinkedListChannel for that matter) until it's empty, which I would suggest should be the correct behaviour.

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(5)
    val p1 = myProducer(channel)
    val c1 = myConsumer(channel)
    c1.await()
}

fun myProducer(channel : Channel<Int>) = async  {
    println("Producer1 is working in thread ${Thread.currentThread().name}")
    for (i in 1..10) {
        println("Sending " + i.toString())
        channel.send(i)
        delay(300L)
    }
    channel.close()
}

fun myConsumer(channel: ReceiveChannel<Int>) = async {
    println("Consumer is working in thread ${Thread.currentThread().name}")
    channel.consumeEach {
        println("Processing " + it.toString())
        delay(3000L)
    }
}

and the output:

Producer1 is working in thread ForkJoinPool.commonPool-worker-1
Sending 1
Consumer is working in thread ForkJoinPool.commonPool-worker-2
Processing 1
Sending 2
Sending 3
Sending 4
Sending 5
Sending 6
Sending 7
Processing 2
Sending 8
Processing 3
Sending 9
Processing 4
Sending 10
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10

Process finished with exit code 0

All 4 comments

Is it possible the 'close' on the channel is preventing the rest of the messages.. basically if you move the close into the main method after the 'join'?

I can't move close() into the main function after join, as p1 returns a ReceiveChannel, which does not have a close() method.

If I remove close entirely, the results are still as shown.
Replacing close with a long delay in the producer also produces the same results.
I tried to replace close with some kind of blocking call that would force the producer to wait until the channel was empty, but I couldn't find any such method.

In comparison, when the channel is created outside the producer and passed in, the behaviour is as-expected. It looks like the producer builder isn't holding an ArrayChannel open (or LinkedListChannel for that matter) until it's empty, which I would suggest should be the correct behaviour.

fun main(args: Array<String>) = runBlocking<Unit> {
    val channel = Channel<Int>(5)
    val p1 = myProducer(channel)
    val c1 = myConsumer(channel)
    c1.await()
}

fun myProducer(channel : Channel<Int>) = async  {
    println("Producer1 is working in thread ${Thread.currentThread().name}")
    for (i in 1..10) {
        println("Sending " + i.toString())
        channel.send(i)
        delay(300L)
    }
    channel.close()
}

fun myConsumer(channel: ReceiveChannel<Int>) = async {
    println("Consumer is working in thread ${Thread.currentThread().name}")
    channel.consumeEach {
        println("Processing " + it.toString())
        delay(3000L)
    }
}

and the output:

Producer1 is working in thread ForkJoinPool.commonPool-worker-1
Sending 1
Consumer is working in thread ForkJoinPool.commonPool-worker-2
Processing 1
Sending 2
Sending 3
Sending 4
Sending 5
Sending 6
Sending 7
Processing 2
Sending 8
Processing 3
Sending 9
Processing 4
Sending 10
Processing 5
Processing 6
Processing 7
Processing 8
Processing 9
Processing 10

Process finished with exit code 0

That is a duplicate of #256 bug. I've verified that it works as expected with the fix.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

LouisCAD picture LouisCAD  路  64Comments

elizarov picture elizarov  路  60Comments

elizarov picture elizarov  路  45Comments

elizarov picture elizarov  路  143Comments

altavir picture altavir  路  44Comments