Kotlinx.coroutines: Why is offerInternal and takeFirstReceiveOrPeekClosed using such high CPU time?

Created on 23 Mar 2018  路  6Comments  路  Source: Kotlin/kotlinx.coroutines

I am experimenting with a project for using coroutines and channels under the hood for reactive-style data streams:

https://github.com/caleb-allen/Konko-Flow

The idea is that any (stateless) operator can be run concurrently. As an example:

val a = listOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Flow.from(a)
        .map { "$it says hi!" }
        .forEach { println(it) }

Each stage would have coroutine(s) processing the operator, and channel(s) to send the data downstream:

[fromOp] -------- channel -------- [mapOp] -------- channel -------- [forEachOp]

Each "stage" is an operator plus a channel. A stateless stage may have multiple coroutines processing the operation, and a channel for each coroutine to pass the results down. We only need a single stream to show the issue.

channels_offerinternal_visualvm

Here is a jvm snapshot using VisualVM. This is using 1 stream. I've been digging through the source to figure out what is hanging but am at a loss. As you can see, AbstractSendChannel.send() is taking significantly more time than the user code ChannelTest$collectTest$times$1$wordsCount$1.invoke(), where the operation is being applied. I am using unlimited channels with LinkedListChannels. Am I misunderstanding or misusing channels?

Best,

Caleb

performance question

All 6 comments

Is it possible that coroutines on either side of an operation are waiting for a lock, and that the coroutines are operating on different threads? A producer must wait for the consumer to finish to send another item?

Also, if this is the case, is there anything that can be done to mitigate this? If coroutines are running on the same thread using the same coroutine context, does that mitigate these lock issues?

Hi, could you please attach an example which hangs or uses too much CPU where it shouldn't (it's fine if it will be with Konko-Flow primitives)? Coroutines are not using locks under the hood, so problem should be somewhere else.

If you look in Dispatcher.kt, the OneToOneDispatcher class is what is launching these coroutines in this case.

ChannelTest.collectTest() is where the profiling snapshot comes from, ingesting a 2GB text file found here

Flow.from transforms the file into a Flow which is a collection of "downstream" channels. In its current configuration, that only ever consists of 1 downstream channel but could be more.

.flatMap builds an Operator which is an extension of Flow with an operation feeding the downstream channels.

OneToOneDispatcher is where upstream data is received, an operation is applied, and the results are sent downstream. Here is the code:

        upstreams.forEachIndexed { index, upstream ->
            launch {
                val downstream = downstreams[index]
                val opActions = object : OperationActions<U> {
                    override suspend fun send(item: U) {
                        downstream.send(item)
                    }

                    override suspend fun done() {
                        downstream.close()
                    }

                }

                for (item in upstream) {
                    operation.apply(item, opActions)
                }
                downstream.close()
            }
        }

Multiple OneToOneDispatcher instances can be chained together with different operations, so a downstream from dispatcher A would be an upstream for dispatcher B.

Is that what you are asking for?

Yes, that's what I needed, thanks.

I've ran your test in a loop for a while and profiled it:
1) Your code (everything starting from ChannelTest$) takes ~30% of total execution time
2) ~14% is spent for reading file
3) ~50% is communication overhead

In pipeline Flow.from(file).partition().flatMap().collect() every source element is passed through 4 concurrent channels and up to 4 context switches, which is a performance killer. Currently our own channel implementation suffers from the same issue, https://github.com/Kotlin/kotlinx.coroutines/issues/285. As a solution, consider fusing all intermediate operations into one channel or build your library on top of kotlinx channel operators and wait until https://github.com/Kotlin/kotlinx.coroutines/issues/285 is fixed.

Am I misunderstanding or misusing channels?

No, in general your approach to build Konko-Flow pipelines is fine.

As an additional performance improvement we can provide specific Channel implementations such as array-based OneToOneChannel and OneToManyChannel, but this improvement is negligible in comparison with operator fusing

Thank you @qwwdfsad. I'll keep an eye on #285 and adapt the library accordingly.

If I might ask, what did you use to get those profile results?

I've used async-profiler and disabled debug mode which is enabled in tests by default.

Was this page helpful?
0 / 5 - 0 ratings