Kotlinx.coroutines: Inconsistency between Flow combine and RxJava combineLatest

Created on 5 Jun 2020  Â·  5Comments  Â·  Source: Kotlin/kotlinx.coroutines

I am looking to migrate some existing RxJava-based code to Coroutines but I ran into some test failures that touched some code that uses Observable.combineLatest. My tests originally use PublishSubjects from RxJava to mock the underlying streams within the combineLatest. However, when I switch to Flow streams in the combine and then update the tests to use Channels instead of PublishSubjects, my tests fail because the expected number of emissions are not received.

I have come up with the following reduced repro (uses MutableStateFlows instead of Channels):

    @Test
    fun `combineLatest Rx`() {
        val subject1 = BehaviorSubject.createDefault(1)
        val subject2 = BehaviorSubject.createDefault("a")

        val observer = Observables.combineLatest(subject1, subject2).test()

        subject1.onNext(2)
        subject2.onNext("b")

        subject1.onNext(3)
        subject2.onNext("c")

        println(observer.values())
    }

Result:

[(1, a), (2, a), (2, b), (3, b), (3, c)]

And then:

    @Test
    fun `combine Coroutines`() = runBlockingTest {
        val subject1 = MutableStateFlow(1)
        val subject2 = MutableStateFlow("a")
        val values = mutableListOf<Pair<Int, String>>()

        val job = launch {
            combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
                .collect { values += it }
        }

        subject1.value = 2
        subject2.value = "b"

        subject1.value = 3
        subject2.value = "c"

        job.cancel()

        println(values)
    }

Result:

[(1, a), (2, a), (2, b)]

Is this expected behavior? The only way I can seem to get the second test to produce the same emissions as the first test is to call yield() after every single value assignment.

test

Most helpful comment

I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.

Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally combine launches two coroutines to get fresh values from both flows and the corresponding code is written so that is deterministic with Unconfined dispatcher by being "fair". This fairness is achieved by calling yield(). which is implemented differently by the TestCoroutineDipsatcher. Let's leave this issue open. It might be possible to fix it so that it works with TestCoroutineDipsatcher just as well as it works with Unconfined dispatcher.

All 5 comments

Yes. It is the expected behavior. MutableStateFlow.value updates are always dispatched by default, just as coroutines are always dispatched. It is so by design, so you'll have to yield() to give them a chance to execute when you run in a single main thread. Rx uses a different default. Most Rx operators are Unconfined by default (don't have their own scheduler), but some of them do have a scheduler and you'll have to read docs to figure out which is which.

You can mimic Rx behavior with coroutines by using launch(Dispatchers.Unconfined). This way you will not have to yield.

Does it help?

Thank you for your reply @elizarov. I updated the test to use Dispatchers.Unconfined like so:

    @Test
    fun `combine Coroutines`() = runBlockingTest {
        val subject1 = MutableStateFlow(1)
        val subject2 = MutableStateFlow("a")
        val values = mutableListOf<Pair<Int, String>>()

        val job = launch(Dispatchers.Unconfined) {
            combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
                .collect { values += it }
        }

        subject1.value = 2
        subject2.value = "b"

        subject1.value = 3
        subject2.value = "c"

        job.cancel()

        println(values)
    }

Result:

[(1, a), (2, a), (2, b), (3, b), (3, c)]

Just as you said, now the emissions are consistent with those of the Rx-based test. Thank you very much!

I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.

I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.

Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally combine launches two coroutines to get fresh values from both flows and the corresponding code is written so that is deterministic with Unconfined dispatcher by being "fair". This fairness is achieved by calling yield(). which is implemented differently by the TestCoroutineDipsatcher. Let's leave this issue open. It might be possible to fix it so that it works with TestCoroutineDipsatcher just as well as it works with Unconfined dispatcher.

Found another issue when testing combine(…).flowOn(testDispatcher) where the test is using runBlockingTest and testDispatcher is a TestCoroutineDispatcher:

    @Test
    fun test() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val channel1 = Channel<Int>()
        val channel2 = Channel<String>()
        val flow1 = channel1.consumeAsFlow()
        val flow2 = channel2.consumeAsFlow()
        val values = mutableListOf<Pair<Int, String>>()

        launch {
            combine(flow1, flow2) { a: Int, b: String -> a to b }
                .flowOn(testDispatcher)
                .collect { values += it }
        }
        channel1.send(1)
        channel2.send("a")
        channel1.send(2)
        channel2.send("b")
        channel1.close()
        channel2.close()

        assertThat(values).containsExactly(
            1 to "a",
            2 to "a",
            2 to "b"
        )
    }

The test ends up failing with the following exception:

java.lang.IllegalStateException: This job has not completed yet
    at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1189)
    at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest(TestBuilders.kt:53)
    at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest$default(TestBuilders.kt:45)

This only occurs when flowOn() is applied on the combine(). If I remove the flowOn() on the combine() but apply flowOn() to flow1, and flow2, the test proceeds to run the assertion at the bottom.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ZakTaccardi picture ZakTaccardi  Â·  3Comments

elizarov picture elizarov  Â·  3Comments

ScottPierce picture ScottPierce  Â·  3Comments

LouisCAD picture LouisCAD  Â·  3Comments

mariusstaicu picture mariusstaicu  Â·  3Comments