What we did was:
fun <T> Observable<Option<T>>.flatten(): Observable<T> {
return flatMap {
it.map {
Observable.just<T>(it)
}.getOrElse {
Observable.empty<T>()
}
}
}
I'm happy to send a PR with it if you are happy with it.
What we'd need is for mapFilter here to be implemented, and flattenOption can be derived from there. It's not that different from the implementation you have there, if you're willing to try it ;)
One thing I'd suggest is using fold instead of map then getOrElse. Same results, one fewer intermediate object created.
If you do a PR, make sure to add it to Flowable, Maybe and Flux too, as all of them are capable of having this pattern.
fold sounds better indeed, cleaner, more concise.
I'll probably send a PR later this week.
Is anyone working on this issue? If the answer is no, I'm happy to try. I have free time this weekend.
aql could also have the rx and reactor integration modules since these instances will enable most of the AQL operators.
Oh, I completely forgot about this... I might have some time tomorrow, otherwise take it.
I didn't do it exactly as you intentded @pakoito , this is what made sense in my head, but I'm happy to iterate and discuss it: https://github.com/arrow-kt/arrow/pull/1550
Flatten is already taken as an extension function. Let's break the general solution down:
fun <T, A, B> Flowable<A>.mapFilter(f: (A) -> Option<B>) =
flatMap { a ->
f(a).fold({ Flowable.empty() }, { b -> Flowable.just(b) })
}
So now we have a function that works for every filter function, and also maps all in one go. Great. Let's specialize it!
fun <T, A> Flowable<Option<A>>.flattenOption() =
mapFilter { it } // mapFilter(::identity) to prevent an additional allocation
It turned out to be super easy to implement, and we've gotten functionality that's generic. But there's more! There is already a flattenOption function defined in FunctorFilter, so if we implement that typeclass we get the extension function for free! No need to write it ourselves.
// in the extensions file flowable.kt
// https://github.com/arrow-kt/arrow/blob/master/modules/fx/arrow-fx-rx2/src/main/kotlin/arrow/fx/rx2/extensions/flowablek.kt
@extension
interface FlowableFunctorFilter: FunctorFilter<ForFlowable>, FlowableFunctor {
override fun <A, B> Kind<ForFlowable, A>.filterMap(f: (A) -> Option<B>): Kind<ForFlowable, B> =
fix().filterMap(f)
}
This @extension annotation creates a top-level extfun flattenOption that works exclusively for Flowable! Not only that but also filter for free. So we now have 1 typeclass implementation for abstraction, and 2 extension functions for Flowable.
@Cotel I think this issue still need to be opened. We need to add FunctorFilter for Reactor. I will do it today or tomorrow.
Oh sry, I saw the merged PR and thought it was finished 馃檹