Kotlinx.coroutines: StateFlow with SharingStarted.Lazily parameter does not start background coroutine in specific cases

Created on 21 Jan 2021  路  8Comments  路  Source: Kotlin/kotlinx.coroutines

Consider the following test:

@Test
fun lazyStateFlowTest() = runBlocking(Dispatchers.Default) {
    val scope = CoroutineScope(Dispatchers.Default + SupervisorJob() + CoroutineName("Test"))

    val flow0 = flow {
        var i = 1
        while (true) {
            delay(100)
            emit(i++)
        }
    }.stateIn(scope, SharingStarted.Lazily, initialValue = 1)

    val flow1 = flow0.stateIn(scope, SharingStarted.Lazily, initialValue = 0)

    assertEquals(expected = 0, actual = flow1.first()) // actual is 0
    delay(3000)
    assertNotEquals(illegal = 0, actual = flow1.first()) // actual is 0 but must be non-zero
}

The reference for the SharingStarted.Lazily says: "Sharing is started when the first subscriber appears and never stops".
When flow1.first() statement is executed, based on the reference, it is expected that flow1 stream starts on the background and never stops.
The test above shows that the actual behavior differs from the expected: second call of flow1.first() statement sends 0 value but it shouldn't because flow0 has already populated flow1 state with new values.

It happens, perhaps, because first() operator gets the default value and cancels subscription from flow1 stream before it subscribes to flow0. In some cases the test above is passed.

Another example that can reproduce race condition:

@Test
fun lazyStateFlowTest2() = runBlocking(Dispatchers.Default) {
    coroutineScope {
        repeat(1000) {
            launch {
                val x = MutableStateFlow(value = 1)
                val y = MutableStateFlow(value = 1)

                val z = x.combine(y) { a, b -> a + b }
                    .stateIn(CoroutineScope(Dispatchers.Default), SharingStarted.Lazily, initialValue = 0)

                assertEquals(expected = 0, actual = z.first())
                delay(2000)
                assertEquals(expected = 2, actual = z.first())
            }
        }
    }
}

Kotlin: 1.4.21
Coroutines: 1.4.2
Runtime: JVM

bug flow

Most helpful comment

One can argue that it is the right behavior

Our documentation for SharingStarted.Lazily states: "Sharing is started when the first subscriber appears and never stops."

Now consider a simplified reproducer without flaky timings:

val flow0 = flow {
    println("Flow is started")
    emit(1)
}.stateIn(this, SharingStarted.Lazily, initialValue = 0)

assertEquals(expected = 0, actual = flow0.first())
assertNotEquals(illegal = 0, actual = flow0.first())

This code will fail, Flow is started won't be printed. This definitely violates the principle of the least surprise and should be addressed somehow

All 8 comments

The root cause is that stateIn does not always subscribe to the upstream flow when its downstream subscriber gets initial value and cancels too fast. One can argue that it is the right behavior. I wonder how critical is that and what would be should be the right behavior? Can you, please, exaplain how did you run into this problem? What kind of app you were writing? What were you trying to do with flows?

One can argue that it is the right behavior

Our documentation for SharingStarted.Lazily states: "Sharing is started when the first subscriber appears and never stops."

Now consider a simplified reproducer without flaky timings:

val flow0 = flow {
    println("Flow is started")
    emit(1)
}.stateIn(this, SharingStarted.Lazily, initialValue = 0)

assertEquals(expected = 0, actual = flow0.first())
assertNotEquals(illegal = 0, actual = flow0.first())

This code will fail, Flow is started won't be printed. This definitely violates the principle of the least surprise and should be addressed somehow

Agreed. I'll fix it.

Having investigated it I don't see a way to fix it. The underlying reason of this behavior is that Flow and SharedFlow architecture is highly asynchronous. In particular, the number of subscriptions is tracked via a conflated StateFlow<Int>. So, when a subscriber appears and disappears very fast the change in the number of subscribers from 0 to 1 and back to 0 can happen fast, will get conflated, and could not be noticed as a result. For the SharingStarted.Lazily it looks like if nothing had happenned at all. The only way to "fix it" that I see so far is to update documentation to note this fact.

in the number of subscribers from 0 to 1 and back to 0 can happen fast

Couldn't be this workarounded by providing other initial value (like -1) so that change to 0 would trigger the lazily started sharing?

There are some other hacks that can be made to make it work specifically for the case of Lazily, but the problem is far more wide-spread. I can have a custom starting policy that waits for a specified number of subscribers to appear and it will suffer from the same asynchrony. I frankly think this asynchrony should be treated as a feature, not as a bug as it pertains to many aspects of how various Flow operators work and is not limited to shared flows.

I, as a library user, would definitely want to have the best performance and expected behavior for the StateFlow created using stateIn operator with SharingStarted.Lazily parameter.

StateFlow basically represents a state. If I see StateFlow, I understand that I can get current state (.value) or subscribe to the state updates (.collect).
On the other hand, API designer can expose StateFlow. It can be simple MutableStateFlow or StateFlow created with stateIn operator.
Also designer can apply optimization strategy depending on the nature of the data (Lazy or Custom optimization, Eager - no optimization).
If a designer decides to optimize StateFlow and a user decides to get the data in a non-expected way (e.g. by calling first operator), then the user has a risk to run into unexpected behavior.

I see 3 solutions for this problem:

  1. Fix stateIn optimizations to eliminate any unexpected behavior
  2. Deny optimizations for stateIn (leave only SharingStarted.Eagerly since required optimizations safely can be achieved with shareIn)
  3. Leave all as is but add warning message that stateIn with optimizations should be used with caution

I would personally strive for the ideal solution by providing extensive API and expected behavior (1) but if it is not possible for some reason (e.g. performance downgrade), I would eliminate any controversial API that can lead to unexpected behavior (2), especially if it can be replaced safely with similar code.

Another solution is to make the number of subscriptions a SharedFlow instead of a StateFlow, for the cases that care about history and not just the latest value, ignoring changes reverted very fast.

Was this page helpful?
0 / 5 - 0 ratings