I just debugged an issue where I did not get any events from the flows combine function.
I had a code where I combined the latest values of a List<Observable<Recipe>> into an Observable<List<Recipe>> by using combineLatest.
I refactored that code to use flows''s combine to convert a List<Flow<Recipe>> into a Flow<List<Recipe>>.
However this never emitted an item if the data sources were empty.
I expect that the function:
@ExperimentalCoroutinesApi
public inline fun <reified T, R> combine(
flows: Iterable<Flow<T>>,
crossinline transform: suspend (Array<T>) -> R
): Flow<R> {
val flowArray = flows.toList().toTypedArray()
return flow {
combineInternal(
flowArray,
arrayFactory = { arrayOfNulls(flowArray.size) },
transform = { emit(transform(it)) })
}
}
gives me an empty array in the transform method but it just hangs forever. (coroutines 1.3.2)
Could you please provide a reproducer for your problem?
Because e.g.
val flow = flowOf<Int>()
combine(flow, flow) { arr: Array<Any?> -> }.collect()
properly completes without hanging. The same for Flowable.
But it cannot give you an empty array anyway because then we will introduce a subtle difference e.g. for
combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R) where we don't have an empty array option and combine(flows: Iterable<Flow<T>>, crossinline transform: suspend (Array<T>) -> R) where we potentially can.
Sure:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.singleOrNull
import kotlinx.coroutines.runBlocking
inline fun <reified T> Collection<Flow<T>>.combine(): Flow<List<T>> {
return combine(this) { it.toList() }
}
fun main() {
runBlocking {
val flowsWithContent = listOf(flowOf(1), flowOf(2))
val flowsWithContentCombined = flowsWithContent.combine().singleOrNull()
check(flowsWithContentCombined == listOf(1, 2))
val flowsWithoutContent = listOf<Flow<Int>>()
val flowsWithoutContentCombined = flowsWithoutContent.combine().singleOrNull()
check(flowsWithoutContentCombined == emptyList<Int>())
}
}
But it cannot give you an empty array anyway because then we will introduce a subtle difference
How is that difference subtle? in the transform body of the two-flow way it's clear that there are two sources so there are two guaranteed values you operate on.
But for the array option it's clear that the array you get in the body has the size of the sources.
In your example, nothing hangs and works as expected.
How is that difference subtle?
combine(flow, flow2) { a, b -> println("$a $b") } will no longer be a simplified substitute for combine(flow, flow2) { arr: Array<T> -> println("${arr[0]} ${arr[1]}") }.
These two usages, though look almost interchangeably, will use different rules for preparing arguments for lambdas and will be producing different results.
The simple rule Returns a [Flow] whose values are generated with [transform] function by combining the most recently emitted values by each flow will no longer be applicable.
And we cannot say that null is "recently emitted value of the flow by default", because that's not true in general (even for combine operator). So for this particular overload, we should add to doc something like "also, if all flows have completed, but at least one of them has not emitted an element, an empty array will be passed to combine function".
Having a one overload that looks similar to others, but behaves slightly differently is a red flag for readability, maintenance and is a PITA for newcomers.
I don't understand what you're saying.
I don't want any special behavior for flows that don't emit values. I only want to have an empty array if there were no flows at all to combine.
In my example I pass pass an empty list of flows, not a list of empty flows.
Oh, sorry for the misunderstanding.
Then you can just check for the empty collection in your combine implementation:
inline fun <reified T> Collection<Flow<T>>.combine(): Flow<List<T>> {
if (isEmpty()) return flowOf(emptyList())
return combine(this) { it.toList() }
}
Why is this closed without a fix? Now I have to remember to never use the library function but always use my own function that fxes the library function.
Because library functions have to be consistent with each other.
Proposed behaviour adds subtle change between vararg and fixed-parameters combine.
Emitting an empty array when the given vararg/collection does not match "emit the most recent values emitted by each flow".
I am sorry that this behaviour does not fit your use-case
For me this is incosistent:
combine with 3 sources calls the lambda with an array of length 3.
if you clarify this with
combine with 3 sources calls the lambda with an array of length 3 only when at least one of the sources has emitted anything
it will change perspective. Nothing is emitted in 0 source case
It's wrong to close this issue. I've just debugged my code and reached to this unexpected behavior (I was about to report this as a new bug). A "combine" that is passed an empty list of flows should emit once the current result status, which is an empty list. The current behavior will hit many users.
@niqueco I don't agree. An empty flow doesn't emit. Why should an empty list of flows emit? There could be some added explanation in the docs though.
I using Flow with Room Database.
I expect, that query:
@Dao
interface CurrencyDao {
@Query("SELECT * from currencies")
fun getCurrencies(): Flow<List<CurrencyMo>>
@Query("SELECT count(*) from currencies")
suspend fun getCount(): Int
}
will return List or emptyList... when call it like: currencyDao.getCurrencies().single() but NO. When list is empty (no data in DB), flow never return any response and stuck on it (code didn't continue until any data are available, but my data never load, because this code is before load data to second table which emit update on this table - race condition)
So I must write this workaround just for using Flow in my code:
val currencies = if (currencyDao.getCount() != 0) currencyDao.getCurrencies().single() else emptyList()
@mtrakal consider this more expressive code:
val currencies = currencyDao.getCurrencies()
.onEmpty { emit(emptyList()) }
.single()
I do agree that empty flows should not be confused with a flow with a default 'empty value'. Maybe this should be Room-configurable, but I think Flow behavior here should not change.
Edit: I'm not sure why your Room flow is not emitting an empty list. Room docs suggest that should be the behavior:
Keep nullability in mind when choosing a return type, as it affects how the query method handles empty tables:
When the return type is Flow
, querying an empty table throws a null pointer exception. When the return type is Flow
, querying an empty table emits a null value. When the return type is Flow
>, querying an empty table emits an empty list.