In our code base we have found a lot of bugs where we call flowable.flatMapCompletable and the
callback that we put in flatMapCompletable would be run multiple times and never stop. The reason there's an issue is that a flowable never completes, so how does it make any sense that we can call flatMapCompletable on it?
Example:
fun updateMember(member: Member): Completable {
val flowable = loadMemberUseCase.find(memberId)
return flowable.flatMapCompletable { member ->
// This code here will be called multiple times
memberRepository.save(member);
}
}
Would it crazy to remove flowables from having access to a flatMapCompletable method?
In particular our code base has Flowable as a return type directly in a dao using android room, and eventually we call flatMapCompletable on it.
The code doesn't compile. flatMapCompletable's functions are required to return a Completable instance.
But otherwise, completion of inner stream types never affects the outer stream. This is true of the regular flatMap functions as well. Each returned stream is subscribed to and its emissions are flattened into the enclosing stream. Since completable represents the reactive equivalent of a void method, each one is run and returns when complete. It's only if an error is thrown is the outer stream affected.
I think what might not be clear in liangmicha's example is that memberRepository.save(member) returns a completable, so the code does compile?
In general though, if you had the following code:
fun foo(): Completable {
return flowable.flatMapCompletable {
Completable.complete()
}
}
wouldn't foo() never complete since it would require the flowable itself to complete? In the java docs for the method, it says:
/**
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
* waits until the upstream and all CompletableSources complete.
but doesn't the upstream never complete if it's a flowable?
Given that this is the case, what would a use case actually be for the flatMapCompletable() method on a flowable? Apologies if I'm misunderstanding something here.
Thanks in advance for your time!
To be clear, an hour ago it said logger.log which didn't exactly seem like a method that would return Completable.
doesn't the upstream never complete if it's a flowable
That's a detail of the Flowable you choose. If it doesn't complete then neither will the one returned from flatMapCompletable. If it does, then so will the one returned from flatMapCompletable (once the nested Completables have also completed).
Try using Flowable.just(1) as your upstream and something like a Completable which sleeps for a second and you'll see the returned instance complete after 1 second.
You have two options: use an erroring Completable to stop the flatMapCompletable and then suppress that exception or use takeUntil before or after the flow with a subject:
Flowable<T> someFlow = ...
CompletableSubject done = CompletableSubject.create();
someFlow.flatMapCompletable(v -> {
if (v == 10) {
done.onComplete();
return Completable.complete();
}
return doStuffAsCompletable(v);
})
.takeUntil(done)
;
I like the takeUntil method but it's not available on a completable unfortunatelly so it seems to me we have to work around it like that:
Flowable<T> someFlow = ...
PublishSubject<Int> done = PublishSubject.create();
someFlow.takeUntil(done).flatMapCompletable(v -> {
if (v == 10) {
done.onNext(1);
return Completable.complete();
}
return doStuffAsCompletable(v);
});
I think it's kinda heavy to wrote this but I don't see any other way unfortunatelly.
I wonder if there could be some operator that complete the upstream when the downstream is complete, flatmap don't do it, switchmap dispose too soon (as soon as a value is emitted which in my case using RxAndroidBle won't work because it's a BluetoothConnection that is emitted and it will be closed before I can read/write in it).
I would have thought we added takeUntil to Completable. It is practically a.ambWith(b) but ambWith is implemented as an N-ary operator underneath so there is room for a dedicated implementation. I'll post a PR for that shortly.
Otherwise, you can't terminate a flow from within a flatMapXXX as there is no way to distinguish between a legitimately empty source or a stop source. That's why takeUntil was suggested before.
Most helpful comment
To be clear, an hour ago it said
logger.logwhich didn't exactly seem like a method that would returnCompletable.That's a detail of the
Flowableyou choose. If it doesn't complete then neither will the one returned fromflatMapCompletable. If it does, then so will the one returned fromflatMapCompletable(once the nestedCompletables have also completed).Try using
Flowable.just(1)as your upstream and something like aCompletablewhich sleeps for a second and you'll see the returned instance complete after 1 second.