I need analog for rxJava takeUntil operator. This logic can be found here
Will such an operator be added to the standard library?
What is your use-case for such an operator? Can you give an example on how you'd use it in your application?
Sure. I need to cancel the flow.
sealed class CatAction {
object StartFetch: CatAction()
object CancelFetch: CatAction()
data class UpdateCats(val cats: List<Cat>): CatAction()
}
suspend fun fetchCats(): List<Cat> {
return httpClient.fetchCats()
}
fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
return actions
.mapNotNull { it as? CatAction.FetchCat }
.flatMapConcat { action ->
flow {
val cats = fetchCats()
emit(UpdateCats(cats))
}
.takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
}
}
This is a super-scary code, since you are collecting actions twice (independently) and back-to-back sequence of CatAction.FetchCat and CatAction.CancelFetch can get processed in any order. I would suggest to simply rewrite it like this:
fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
return actions
.mapNotNull { it as? CatAction.FetchCat }
.flatMapLatest { action ->
// this flow will get cancelled as soon as new action comes in
flow {
val cats = fetchCats()
emit(UpdateCats(cats))
}
}
}
Will it work for you?
No. It's not work for me. I need to process only one CatAction.FetchCat per moment. In your code it will be cancel current request and start new.
So I need this behavior:
When the first CatAction.FetchCat is received, start the request.
When receiving new CatAction.FetchCat while the first CatAction.FetchCat is processing, ignore new CatAction.FetchCat.
When the CatAction.CancelFetch is received, cancel the request.
UPD:
when I wrote the usage example, I forgot to add filterNot under flatMapConcat for ignore new actions.
Fixed example:
sealed class CatAction {
object StartFetch: CatAction()
object CancelFetch: CatAction()
data class UpdateCats(val cats: List<Cat>): CatAction()
}
data class CatState(
val cats: List<Cat>? = null,
val isLoading: Boolean = false
)
suspend fun fetchCats(): List<Cat> {
return httpClient.fetchCats()
}
fun catFetcher(actions: Flow<CatAction>, state: StateProvider<CatState>): Flow<CatAction> {
return actions
.filterNot { state.isLoading }
.mapNotNull { it as? CatAction.FetchCat }
.flatMapConcat { action ->
flow {
val cats = fetchCats()
emit(UpdateCats(cats))
}
.takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
}
}
The example you've given still has the problem of collection actions twice and gives no guarantee on how back-to-bach FetchCat and CancelFetch pair is processed. I don't see how takeUntil could be used to write a reliable code for your specific use-case.
You use-case can be implemented with some kind of dropWhenBusy (see #1798) that would also take a filtering lambda to specify which events need to be dropped while downstream is busy, so that you can write it like this. I've also corrected the code to use transformLatest instead of flatMapLatest.
fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
return actions
.dropWhenBusy { it is CatAction.FetchCat } // ignore featch requests when working on prev
.transformLatest { action ->
// this block will get cancelled as soon as cancel action comes in
if (action is CatAction.FetchCat) emit(UpdateCats(fetchCats()))
}
}
Yeah. It can be implemented with dropWhenBusy.
I think dropWhenBusy + transformLatest in this case has a very non-obvious behavior. Due to the fact that we cancel not only for the CancelFetch, but for everything except the FetchCat.
fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
return actions
.dropWhenBusy { it is CatAction.FetchCat } // ignore featch requests when working on prev
.filter { it is CatAction.FetchCat || it is CatAction.CancelFetch } // ignore actions
.transformLatest { action ->
// this block will get cancelled as soon as cancel action comes in
if (action is CatAction.FetchCat) emit(UpdateCats(fetchCats()))
}
}
@rougsig What alternative would you suggest?
An operator that can cancel Flow on an event from another Flow. takeUntil(other Flow) cat do that logic.
In basic some rx conditional operators.
@rougsig Do you have a use-case for cancelling a flow on an event from _another_ flow? The use-case we've been discussing so far was about cancelling a flow on an event from _the same_ flow.
Sure:
sealed class LifeCycleAction {
object Create: LifeCycleAction()
object Destroy: LifeCycleAction()
}
sealed class CatAction {
object StartFetch: CatAction()
data class UpdateCats(val cats: List<Cat>): CatAction()
}
val lifeCycle: Channel<LifeCycleAction>
val requests: Channel<CatAction>
val cats: Flow<List<Cat>>
// all computation will be calculated on other context.
// in that reason we can not use job.cancel()
fun catFetcher(actions: Flow<CatAction>, lifeCycle: Flow<LifeCycleAction>): Flow<CatAction> {
return actions
.filterNot { state.isLoading }
.mapNotNull { it as? CatAction.StartFetch }
.flatMapConcat { action ->
flow {
val cats = fetchCats()
emit(UpdateCats(cats))
}
.takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
}
}
class Screen: CoroutineScope {
fun onCreate() {
launch {
lifeCycle.send(LifeCycleAction.Create)
requests.send(CatAction.StartFetch)
}
launch {
cats.collect { cats -> renderCats(cats) }
}
}
fun onDestroy() {
launch { lifeCycle.send(LifeCycleAction.Destroy) }
}
}
But in my pet app i have one Flow with all application Actions.
So in that case i can cancel flow like this:
fun catFetcher(actions: Flow<CatAction>): Flow<CatAction> {
return actions
.dropIf { it is CatAction.CancelFetch }
.mapNotNull { it as? CatAction.FetchCat }
.map { action ->
emit(UpdateCats(fetchCats()))
}
}
@rougsig I'm a bit lost. You're showing the same code that, in essence, does:
actions .... /* some transformation */
.takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
// ^^^^^^^^ uses _the same_ "actions" flow here
It does not work this way. You can get really weird results. Do _you_ actually have a use-case where it comes from different flows?
No. In real it only one Flow.
Then you need some other solution to your problem that would not use takeUntil, but something else.
Tomorrow I will add comment with link to the simple Android application for this case.
I creating pet project with flow being inspired by https://redux-observable.js.org/
Recipe of cancellation uses takeUntil operator.
import { ajax } from 'rxjs/ajax';
const fetchUserEpic = action$ => action$.pipe(
ofType(FETCH_USER),
mergeMap(action => ajax.getJSON(`/api/users/${action.payload}`).pipe(
map(response => fetchUserFulfilled(response)),
takeUntil(action$.pipe(
ofType(FETCH_USER_CANCELLED)
))
))
);
Why can I get really strange results? in this case:
actions .... /* some transformation */
.takeUntil(actions.mapNotNull { it as? CatAction.CancelFetch })
// ^^^^^^^^ uses _the same_ "actions" flow here
So all the cases I had can be solved using transformLatest and combining the start and cancel events into one. Thank you.
This would be a valid use-case in my opinion for takeUntil:
val cancelChannel: BroadCastChannel<Unit>
val intervalFlow: Flow<Unit> // emits every x TimeUnit
intervalFlow
.takeUntil(cancelChannel.asFlow())
.onEach { /* do something */ }
.launchIn(someScope)
// later
cancelChannel.offer(Unit) // after this, intervalFlow should no longer emit
@floschu Where you might encounter a use-case like this? You can use structured concurrency for this kind of cancellation. Can you elaborate on your example, please.
Why can I get really strange results? in this case:
@rougsig Did you figure it out?
@floschu Where you might encounter a use-case like this? You can use structured concurrency for this kind of cancellation. Can you elaborate on your example, please.
If the intervalFlow is not launched by me, I would not know how to cancel it via structured concurrency, maybe you could elaborate on this.
A more specific use-case for this would be if a Flow is launched in a library and I can only transform it:
library:
package com.library
abstract class Store<Action, Mutation, State> {
protected val actions = BroadCastChannel<Action>(BUFFERED)
open var mutator: (action: Action) -> Flow<Mutation> = { emptyFlow() }
init {
actions.asFlow()
.flatMapMerge { action -> mutator(action) }
.scan(initialState) { previousState, mutation -> reduce(previousState, mutation) }
.onEach { /* publish state */ }
.launchIn(someScope)
}
}
implementation:
package com.mine
enum class Action { LOAD }
class MyStore : Store<Action, Int, Int> {
override var mutator = { action ->
when(action) {
Action.LOAD -> flow {
val value: Int = someSuspendingOperation()
emit(value)
}.takeUntil(actions.asFlow().filter { it is Action.LOAD })
}
}
}
In this case I want the created flow { ... } in MyStore.mutator to not emit its value when another Action.LOAD comes in.
@floschu Your latest example is not reliable. You are, again, collecting the same actions flow twice.
How to write it correctly? I don't have a ready answer. I'll need to know more about this kind of architecture to find a solution. Would Action.LOAD be the only action that flows over actions flow? If there are other kinds of actions in actions flow, then how does it work? Can you give a worked out example with multiple types of actions (if they are needed)?
@floschu Your latest example is not reliable. You are, again, collecting the same
actionsflow twice.How to write it correctly? I don't have a ready answer. I'll need to know more about this kind of architecture to find a solution. Would
Action.LOADbe the only action that flows overactionsflow? If there are other kinds of actions inactionsflow, then how does it work? Can you give a worked out example with multiple types of actions (if they are needed)?
I do not see how an example with multiple actions would make you understand the architecture better 馃槄
I have an elaborate example for you, but it would be a bit of a bigger read I guess:
In general I need a way to cancel the Flow that is created in the ControllerImplementation.mutator. Could be with the same Action, could be with another Action or could be with another Flow entirely (maybe there is a global Flow somewhere outside of the Controller that needs to cancel the mutation).
In RxJava I would have used the .takeUntil(Observable) operator but not sure how to implement it best with coroutines.
@elizarov I this operator is most useful when a flow is used to represent a UI event.
// timer is a flow that emits every second, endlessly
val timer = timerFlow()
runButton.clicks()
.flatmap(timer.takeUntil(stopButton.clicks()))
.onEach {
... update ui ....
}
.launchIn(this)
this is a common use-case in RXjava (see rxbindings) and it is already been replicated with co-routines (see flow bindings)
@RoryKelly timerFlow-like thing is in our backlog for a while. Please, follow #540 for details. However, the timer.takeUntil(someEvent) is an interesting pattern, indeed.
The example you've posted is relatively safe, since it actually combines three different flows (runButton, stopButton, timer), but is still conceptually prone to the same asynchronous reordering issue (click on "run" button followed by a fast click on "stop" button can leave the UI running and ticking). It should work correctly in UI thread (but no hard guarantees even there) and it can badly backfire when combined with flowOn and background threads.
@elizarov I think the threading issues caused by flowOn on are generally managed by the implementation of clicks.
@CheckResult
@UseExperimental(ExperimentalCoroutinesApi::class)
fun View.clicks(): Flow<Unit> = callbackFlow {
checkMainThread()
val listener = View.OnClickListener {
safeOffer(Unit)
}
setOnClickListener(listener)
awaitClose { setOnClickListener(null) }
}.conflate()
The coroutine would throw when the flowOn operator is used incorrectly and its up to the user to implement error handling.
In other stream implementations, the Ux problem you outlined click on "run" button followed by a fast click on "stop" button can leave the UI running and ticking are handled by operators like share/cache/refcount etc.
I think takeUntil would allow the use of flows in the UI layer/simplify complex flow, a lot of the community is using the approach, in the rxjava world and I can see coroutines moving in that direction. see https://github.com/freeletics/CoRedux and https://github.com/freeletics/RxRedux
@ExpensiveBelly This is exactly the link posted by topic starter at the start of the topic :-)
My use case is I'd like to collect flow A _only while_ some condition is true represented by flow B. A takeUntil(Flow) operator is typically what I use to implement (part of) this.
A real world example is collecting location data _only while_ the user has granted permission to do so.
Most helpful comment
@elizarov I think the threading issues caused by
flowOnon are generally managed by the implementation ofclicks.The coroutine would throw when the
flowOnoperator is used incorrectly and its up to the user to implement error handling.In other stream implementations, the Ux problem you outlined
click on "run" button followed by a fast click on "stop" button can leave the UI running and tickingare handled by operators likeshare/cache/refcountetc.I think
takeUntilwould allow the use of flows in the UI layer/simplify complex flow, a lot of the community is using the approach, in the rxjava world and I can see coroutines moving in that direction. see https://github.com/freeletics/CoRedux and https://github.com/freeletics/RxRedux