ReceiveChannel is missing some very useful extension functions
.combineLatest()
.distinctUntilChanged()
.switchMap()
.merge()
.scan()
.filterIsInstance()
Would it be possible to add these?
Yes. However, we really want to abstract aways all kinds of streams first (see #254) and then implement all the operators on top of it. I'll keep it open though. PRs are still welcome.
It would be really useful to have the filterIsInstance operator to match that of the stdlib for collections, would it make sense to create a new issue for that?
@ashdavies Please, don't create a new issue. Let's keep a list of missing operators here.
Hopefully, channel will implement the same interface as cold stream and we will implement operators once and for all
Especially :+1: for merge, right now I have to use ugly workarounds >.<
(note how merge described by those marbles only closes when all inputs close, which is key for me)
I'd love to see more overloads for the combineLatest operator, or at least one combineLatest operator that takes a List<Flow<*>> so I can create my T1..Tx combineLatest functions myself.
In RxJava I really often use the pattern of combining the latest emissions of an observable into a view state:
Observable.combineLatest(userStream, recipeForId(id), foodFavoritesStream, userSettingsStream, ::buildViewState)
A combineLatest operator that takes a List<Flow<*>> breaks type safety, and is therefore not useful IMHO.
However, I have the same use case, and I'd like to have combineLatest overloads with up to 4 or 5 parameters (in addition to the receiver) to merge latest values of flows into a data class.
I set 4 or 5 as the hard limit because beyond that point, it's unlikely the code is readable, and intermediate combined flows should be used instead (possibly using Pair<A, B> to take advantage of destructuring).
Is that blocked by receiveOrClosed (#330) not there yet because of the function with the following signature private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any>?
If not, I can work on a PR, let me know.
All of them have been eventually implemented
Merge is still missing, but filed https://github.com/Kotlin/kotlinx.coroutines/issues/1491
Most helpful comment
Hopefully, channel will implement the same interface as cold stream and we will implement operators once and for all