I am looking to migrate some existing RxJava-based code to Coroutines but I ran into some test failures that touched some code that uses Observable.combineLatest. My tests originally use PublishSubjects from RxJava to mock the underlying streams within the combineLatest. However, when I switch to Flow streams in the combine and then update the tests to use Channels instead of PublishSubjects, my tests fail because the expected number of emissions are not received.
I have come up with the following reduced repro (uses MutableStateFlows instead of Channels):
@Test
fun `combineLatest Rx`() {
val subject1 = BehaviorSubject.createDefault(1)
val subject2 = BehaviorSubject.createDefault("a")
val observer = Observables.combineLatest(subject1, subject2).test()
subject1.onNext(2)
subject2.onNext("b")
subject1.onNext(3)
subject2.onNext("c")
println(observer.values())
}
Result:
[(1, a), (2, a), (2, b), (3, b), (3, c)]
And then:
@Test
fun `combine Coroutines`() = runBlockingTest {
val subject1 = MutableStateFlow(1)
val subject2 = MutableStateFlow("a")
val values = mutableListOf<Pair<Int, String>>()
val job = launch {
combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
.collect { values += it }
}
subject1.value = 2
subject2.value = "b"
subject1.value = 3
subject2.value = "c"
job.cancel()
println(values)
}
Result:
[(1, a), (2, a), (2, b)]
Is this expected behavior? The only way I can seem to get the second test to produce the same emissions as the first test is to call yield() after every single value assignment.
Yes. It is the expected behavior. MutableStateFlow.value updates are always dispatched by default, just as coroutines are always dispatched. It is so by design, so you'll have to yield() to give them a chance to execute when you run in a single main thread. Rx uses a different default. Most Rx operators are Unconfined by default (don't have their own scheduler), but some of them do have a scheduler and you'll have to read docs to figure out which is which.
You can mimic Rx behavior with coroutines by using launch(Dispatchers.Unconfined). This way you will not have to yield.
Does it help?
Thank you for your reply @elizarov. I updated the test to use Dispatchers.Unconfined like so:
@Test
fun `combine Coroutines`() = runBlockingTest {
val subject1 = MutableStateFlow(1)
val subject2 = MutableStateFlow("a")
val values = mutableListOf<Pair<Int, String>>()
val job = launch(Dispatchers.Unconfined) {
combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
.collect { values += it }
}
subject1.value = 2
subject2.value = "b"
subject1.value = 3
subject2.value = "c"
job.cancel()
println(values)
}
Result:
[(1, a), (2, a), (2, b), (3, b), (3, c)]
Just as you said, now the emissions are consistent with those of the Rx-based test. Thank you very much!
I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.
I guess what I still don't understand in the original example is why anything beyond
(1, a)gets emitted at all? It just sort of seems likecombinerequired some form ofyield-ing halfway through.
Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally combine launches two coroutines to get fresh values from both flows and the corresponding code is written so that is deterministic with Unconfined dispatcher by being "fair". This fairness is achieved by calling yield(). which is implemented differently by the TestCoroutineDipsatcher. Let's leave this issue open. It might be possible to fix it so that it works with TestCoroutineDipsatcher just as well as it works with Unconfined dispatcher.
Found another issue when testing combine(…).flowOn(testDispatcher) where the test is using runBlockingTest and testDispatcher is a TestCoroutineDispatcher:
@Test
fun test() = runBlockingTest {
val testDispatcher = TestCoroutineDispatcher()
val channel1 = Channel<Int>()
val channel2 = Channel<String>()
val flow1 = channel1.consumeAsFlow()
val flow2 = channel2.consumeAsFlow()
val values = mutableListOf<Pair<Int, String>>()
launch {
combine(flow1, flow2) { a: Int, b: String -> a to b }
.flowOn(testDispatcher)
.collect { values += it }
}
channel1.send(1)
channel2.send("a")
channel1.send(2)
channel2.send("b")
channel1.close()
channel2.close()
assertThat(values).containsExactly(
1 to "a",
2 to "a",
2 to "b"
)
}
The test ends up failing with the following exception:
java.lang.IllegalStateException: This job has not completed yet
at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1189)
at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest(TestBuilders.kt:53)
at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest$default(TestBuilders.kt:45)
This only occurs when flowOn() is applied on the combine(). If I remove the flowOn() on the combine() but apply flowOn() to flow1, and flow2, the test proceeds to run the assertion at the bottom.
Most helpful comment
Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally
combinelaunches two coroutines to get fresh values from both flows and the corresponding code is written so that is deterministic withUnconfineddispatcher by being "fair". This fairness is achieved by callingyield(). which is implemented differently by theTestCoroutineDipsatcher. Let's leave this issue open. It might be possible to fix it so that it works withTestCoroutineDipsatcherjust as well as it works withUnconfineddispatcher.