Kotlinx.coroutines: combine(Iterable<Flow>) is very slow

Created on 11 Oct 2020  ·  12Comments  ·  Source: Kotlin/kotlinx.coroutines

The combine(…) function becomes very slow the depending on the amount of Flows.

Flat-mapping a 40k List<List<…>> takes approx. 11ms. With one element per list.
Flat-mapping a 40k List<Flow<…>> takes approx. 1 minute. With one element per Flow.
And that's with a very very simple non-suspending test case.

If combine is expected to have such a low performance it should be documented, otherwise developers like me treat is as a simple Flow-version of List<List<…>>.flatten().
If it's not expected then there is a bug.

My use case

I have ~40k objects in memory. For each object there is a Flow and a coroutine that periodically refreshes the object from the server when it's expired and emits that. At some point I have to continuously merge the latest version of all 40k objects into a single list for further processing. For that I use combine(listOfObjectFlows) { it.toList() }.

Unfortunately that takes somewhere north of 15 minutes already. I have no time to let it finish to see the total…

I've written my own implementation now as a workaround.

Test code

import kotlin.time.*
import kotlinx.coroutines.flow.*

@OptIn(ExperimentalTime::class)
suspend fun main() {
    val flowList = (1..40_000).map { flowOf(it) }
    val start = TimeSource.Monotonic.markNow()
    val listFlow = combine(flowList) { it.toList() }

    listFlow.collect {
        println("Took: ${start.elapsedNow()}") // Took: 66.5s
    }
}
performance

Most helpful comment

Even with a 40% gain the timing would still be a different ballpark than what I expect (minutes instead of seconds).

The problem was pretty simple -- accidental and well-hidden O(N^2) asymptotic where N is the number of flows.
I've fixed it, it will be available in 1.4.0 and will be even faster (by a significant margin) than proposed implementation.
In general, for a large number of flows, combine became faster by orders of magnitude and for two flows ("basic case") it became precisely two times faster.

Thanks for pointing it out!

All 12 comments

Thanks for the report!
We indeed have some room for improvement, e.g. by simple tweaking I've achieved ±40% of throughput and here are opportunities for even bigger gains (SPSC channels, select replacements etc.).

If combine is expected to have such a low performance it should be documented, otherwise developers like me treat is as a simple Flow-version of List

Could you please elaborate on what exactly would you like to see in the documentation?
Any operation over the stream of flows is, _by design_, slower than its collection-based/non-suspending/sequential counterpart, because all the flows should be collected asynchronously (-> at least one coroutine/channel per flow) and we do not know whether an arbitrary instance of Flow is suspending or not.
The factor of the slowdown is operator, usage, GC and JVM specific, so it's impossible to pinpoint some real numbers as well.

I've written my own implementation now as a workaround.

It's okay if it suits you, but please beware that it doesn't comply our combine operator. For example, the following snippet:

val flow = flowOf("a", "b", "c")
val flow2 = flowOf("1", "2", "3")
println(flow.combine(flow2) { i, j -> i + j }.toList())
println(listOf(flow, flow2).yourCombine { list -> list[0] + list[1] }.toList())

will print

[a1, b1, b2, c2, c3]
[c1, c2, c3]

which probably is not expected.

@qwwdfsad true. I'm aware that Flows are slower than Lists in general. And I think and hope that most developers are. These 40k objects of mine cause several coroutines each so I probably have several 100k of coroutines. Yet all operations finish in a matter of second to milliseconds.

In this specific case I was looking for a Flow-version of List<List<…>>.flatten(). While for 40k elements it's quite fast with List it's somewhere in the realm of 30+ minutes with combine (each Flow had at exactly one value emitted). That's way more than just asynchronous overhead. And that leads to your next point.

My implementation does indeed yield different results. That is in part because the current documentation is not clear on how exactly combine combines the two Flows.

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

My implementation also combines the most recently emitted values of each flow. It just happens to ignore intermediary values where newer ones are available. That's okay for me because I'm only interested in the most recent combination and not in intermediary results.

The fact that combine has to do some heavy lifting in order to account for all intermediary value is something I'd also mention in the docs. Being somewhat slower because asynchronous is very different from an O(something high) operation because it has to fulfill some special requirements. At least that wasn't clear for me.

In addition to improving the documentation maybe it makes sense to add a version like mine as an alternative operator. One that is super fast as it explicitly only cares about the most recent state and skips intermediate values if there are newer ones available. It can probably be optimized even further that my simple implementation.

When I was looking for a solution to turn my List<Flow<…>> into a Flow<List<…>> I was scanning the documentation of all operators up and down multiple times for a suitable operator. Combine seemed like the only fit. With two options I would look closely to the difference between them.

Hi, @qwwdfsad,
please consider that the output of

import kotlinx.coroutines.flow.*

suspend fun main() {
    val flow = flowOf("a", "b", "c")
    val flow2 = flowOf("1", "2", "3")
    println(flow.combine(flow2) { i, j -> i + j }.toList())
}

on my AMD A8-3870 can be

~
[a3, b3, c3]
[c1, c2, c3]
[a1, a2, a3, b3, c3]
[a1, b1, c1, c2, c3]
~

@qwwdfsad was writing exactly the same as @fvasco - same applies for online playground: https://pl.kotl.in/3qubtJaOh

Yes, with suspend fun main there is no dispatcher, so multithreaded Dispatchers.Default is used and adds non-determinism.
I was implying deterministic case when combine is called from within a runBlocking or Dispatchers.Main

So [c1, c2, c3] is an expected result, all suspending function should be Dispatcher-agnostic.

The expected result should be [a1, b1, b2, c2, c3] in a single-threaded environment.
In fact, it tries to somehow imitate "fairness", giving each flow an opportunity to emit its values even in the single-thread dispatcher. For multithreaded dispatchers, it's obviously not the case and is timing-dependent.

Regarding the proposed operator, we cannot accept it for multiple reasons:
1) If it's named combine, then suddenly various overloads will behave differently for the same flows.
It's not the best idea to have f1.combine(f2) and listOf(f1, f2).combine producing different results in the same environment.
2) If it's named differently, then we end up in a situation where we have "very similar but slightly different operators and it's not clear which one to chose".

The only resolution for this issue is to improve the performance of combine operator without changing its behaviour or semantics

Great, that explanation should be part of the documentation 👍

I guess it's not possible to optimize the multi-threaded case by not making it fair but fast instead?

Regarding my problem I don't see how optimizing combine would help here anyway.

  • Even with a 40% gain the timing would still be a different ballpark than what I expect (minutes instead of seconds).
  • A lot of unnecessary intermediary combinations would be created that I never actually need. I'd drop many of them immediately afterwards, potentially using debounce() or expensive operators like distinctUntilChanged() or stateIn() (due to calling equals for large data sets).

Regarding a new operator:

  1. I've only named it combine because it was unclear what the current implementation does. It clearly doesn't make sense to use that name if it does something different.
  2. Regarding a different operator name there are two properties to the kind of operator that I propose:

    • It combines multiple flows into one using a transform.

    • It only cares about the latest combination and not about intermediary combinations.

So what I need would perfectly describe a combineLatest operator. It combines just like combine but doesn't need to be fair because it's only interested in the latest value.

There's deprecation in place for combineLatest which maps to combine. I don't know the history of that operator or how it was defined, but at least given its name it would fit.

Hi @fluidsonic,
is f1.combineLatest(f2) equal to f1.conflate().combine(f2.conflate())?

I had to change my implementation to use collectLatest instead of collect to avoid more intermediary values.

@fvasco I ran a quick test with 5k Flows of 2 values each. All Flows are static, i.e. flowOf(Int, Int). No excessive operations.
conflate() is somewhat beneficial only if there are a high number of Flows.

Using Dispatchers.Default:

248ms - flows.combineLatest() { it }                          // number of emitted values is low but varies
3.67s - combine(flows.map { it.conflate() }) { it.toList() }  // number of emitted values is low but varies
2.38s - combine(flows) { it.toList() }                        // number of emitted values is low but varies

Using newFixedThreadPoolContext(1, "test"):

362ms - flows.combineLatest()                                 // emits single value
11.0s - combine(flows.map { it.conflate() }) { it.toList() }  // emits all intermediary values
10.9s - combine(flows) { it.toList() }                        // emits all intermediary values

Just for fun - Dispatchers.Default with 50k Flows (my app has about 40k):

886ms - flows.combineLatest()
283s  - combine(flows.map { it.conflate() }) { it.toList() }  // oh my
648s  - combine(flows) { it.toList() }                        // oh my

Even with a 40% gain the timing would still be a different ballpark than what I expect (minutes instead of seconds).

The problem was pretty simple -- accidental and well-hidden O(N^2) asymptotic where N is the number of flows.
I've fixed it, it will be available in 1.4.0 and will be even faster (by a significant margin) than proposed implementation.
In general, for a large number of flows, combine became faster by orders of magnitude and for two flows ("basic case") it became precisely two times faster.

Thanks for pointing it out!

Awesome and thanks @qwwdfsad. I'll check it out.

Was this page helpful?
0 / 5 - 0 ratings