RxJava version 2.1.2.
The following code:
Single.just("Test")
.subscribeOn(Schedulers.computation())
.flatMapObservable(
s -> {
System.out.println("1: " + Thread.currentThread());
return Observable.just(1)
.observeOn(Schedulers.io())
//.doOnNext(o -> System.out.println("2: " + Thread.currentThread()))
;
}
)
.subscribe(o -> {
System.out.println("3: " + Thread.currentThread());
});
...produces the following output:
1: Thread[RxComputationThreadPool-1,5,main]
3: Thread[RxComputationThreadPool-1,5,main]
Expected output here is:
1: Thread[RxComputationThreadPool-1,5,main]
3: Thread[RxCachedThreadScheduler-1,5,main]
However, when the line containing doOnNext is uncommented, this is the output:
1: Thread[RxComputationThreadPool-1,5,main]
2: Thread[RxCachedThreadScheduler-1,5,main]
3: Thread[RxCachedThreadScheduler-1,5,main]
It looks like the thread is not switched in the first case.
FlatMap merges the sources on one of the emitting threads, sometimes that means one thread will emit someone else's produced value. This could also be the thread the inner sources are generated, in case the inner source completes fast enough. The inner doOnNext clearly indicates that the 1 is processed on the desired scheduler.
There is a flatMapAsync extensions transformer that collects and reemits values on a specific scheduler.
flatMap indeed can emit downstream value on upstream's thread if upstream completes (or errors) and value from inner observable arrives fast enough, this is caused by drain() call in onComplete() callback from upstream.
@akarnokd is there any particular reason why we need to run drain when upstream completes? Threading is very important sometimes, more determinism here would be great.
@nhaarman your example can be rewritten with Observable.create() instead of Single.just() and removing/delaying completion event of upstream observable causes flatMap to emit on inner's observable thread.
TestObserver<Object> ts = new TestObserver<Object>();
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Test");
// e.onComplete(); // Uncomment to reproduce emission on computation thread.
}
})
.subscribeOn(Schedulers.computation())
.flatMap(
new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Exception {
System.out.println("1: " + Thread.currentThread());
return Observable
.just(1)
.observeOn(Schedulers.io());
}
}
)
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("3: " + Thread.currentThread());
}
})
.subscribe(ts);
ts.awaitTerminalEvent();
Ah I see, we need to complete downstream if upstream and all inner observables completed, that's why drain..
Well, I guess it's possible to rewrite flatMap to only "drain" complete events when upstream completes, but that can complicate operator implementation.
I'm not sure if this is working as intended, or a bug. The somewhat non-deterministic behavior is quite confusing, and adding logging to find out what's happening actually fixes the 'issue', reminding of the Heisenbug.
Especially on Android, this behavior may lead to crashes if the value is not emitted on the main thread.
Use observeOn to route events to the desired thread, for example, before subscribe().
My real-world scenario currently requires observeOn to happen inside the flatMapObservable, due to abstractions.
Especially on Android, this behavior may lead to crashes if the value is not emitted on the main thread
That's why peoply, by reflex, apply .observeOn(AndroidSchedulers.mainThread()) to route the results back to the main thread.
The problem with that is you end up delaying frames and breaking sources and emit synchronously. The general recommendation is to push observeOn as far "up" into the streams as possible so the fact that they may not be honored is a bit disturbing.
That's why peoply, by reflex, apply
.observeOn(mainThread())to route the results back to the main thread.
And it's fine, because it feels kinda wrong for the downstream to rely on scheduling applied in inner observable of the flat(switch)Map.
The problem with that is you end up delaying frames and breaking sources and emit synchronously.
Delaying is understandable and usually fine since upstream is async anyways and we're talking milliseconds here, but can you please elaborate on "breaking sources and emit synchronously"?
The general recommendation is to push
observeOnas far "up" into the streams as possible so the fact that they may not be honored is a bit disturbing.
I would say that you need to apply observeOn(scheduler) as soon as you actually need to notify downstream/consumer on that particular scheduler, which is kinda opposite to "as far up into the streams as possible".
Especially with Main thread because you usually want to perform as less work on that thread as possible.
So I would say that this issue is actually not a problem, but more something people should know about.
Also looks like there is no alternative way of implementing flat(switch)Map operator so it would only emit on scheduler of the inner observable(s) keeping all existing properties of the operator. Because operator's behavior relies on upstream events (completion and error in particular). @akarnokd, please correct me if I'm wrong.
If you bind data, delaying a frame will cause a noticeable UI problem.
Well said @artem-zinnatullin.
@JakeWharton you can bind like that:
upstream
.observeOn(mainScheduler)
.startWith(initialValueOrObservable)
.subscribe { }
If you can guarantee that subscribe is called from main thread (which usually true for UI related code) and observable in startWith() doesn't switch thread.
This should be possible to implement pretty elegantly with Kotlin type system in a way, that'll remove startWith on consumer side.
Another option is to create "fast-path" version of AndroidSchedulers.mainScheduler() that runs action without scheduling if execution already happens on the main thread.
However @akarnokd and others had good arguments against it. https://github.com/ReactiveX/RxAndroid/pull/228
But generally speaking skipping one frame during initial value binding doesn't look like a big issue, especially with transition animations (maybe I'm wrong). And whenever it's a big issue — Rx is probably not a good solution for precise frame by frame rendering.
You are wrong, yes. As a layout pass would have happened you'll trigger
animation.
On Tue, Aug 15, 2017, 3:27 AM Artem Zinnatullin :slowpoke: <
[email protected]> wrote:
@JakeWharton https://github.com/jakewharton you can bind like that:
upstream
.observeOn(mainScheduler)
.startWith(initialValueOrObservable)
.subscribe { }If you can guarantee that subscribe is called from main thread (which
usually true for UI related code) and observable in startWith() doesn't
switch thread.This should be possible to implement pretty elegantly with Kotlin type
system in a way, that'll remove startWith on consumer side.Another option is to create "fast-path" version of
AndroidSchedulers.mainScheduler() that runs action without scheduling if
execution already happens on the main thread.However @akarnokd https://github.com/akarnokd and others had good
arguments against it. ReactiveX/RxAndroid#228
https://github.com/ReactiveX/RxAndroid/pull/228But generally speaking skipping one frame during initial value binding
doesn't look like a big issue, especially with transition animations (maybe
I'm wrong). And whenever it's a big issue — Rx is probably not a good
solution for precise frame by frame rendering.—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/issues/5550#issuecomment-322433382,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEEEbMSFSoZdly_YAktN5eaxTbmwhUoks5sYXKBgaJpZM4O0sjY
.
flatMap is an asynchronous boundary operator and as such, there is no guarantee what thread it will emit on, especially if its main input and inner sources have their own scheduler. That's why, in general, when there is doubt, apply observeOn to ensure the desired thread does the emission.
@artem-zinnatullin
is there any particular reason why we need to run drain when upstream completes?
Because otherwise items would remain indefinitely in the internal buffers and your stream would hang. If backpressure is involved, that's wosre as the downstream won't even request more elements until it received the previously requested amount, which livelocks the stream.
Threading is very important sometimes, more determinism here would be great.
That's why you need observeOn (or flatMapAsync) to give that deterministic emission thread.
If you bind data, delaying a frame will cause a noticeable UI problem.
The issue is that flatMap() can emit on upstream thread instead of inner.
@JakeWharton if you "bind data", what is the scenario when this particular "feature" of flatMap() causes delay that would not occur if flatMap() would guarantee emission on inner thread?
Pseudo-code:
upstream
.flatMap { inner }
.subscribe { bind(it) }
You are wrong, yes
Ah, my favorite passive-aggressive open source communication style. Definitely encourages people to collaborate and look for solutions :trollface: 😽
I don't have time to pick up this thread right now, but just wanted to clarify there's nothing passive about that comment. You postulated that you might be wrong, and I did you the service of clarifying.
Sorry one more thing and I'll stop being off-topic. There's a beautiful irony in your comment as it's thus far the only thing that's passive-aggressive.
there's nothing passive about that comment
So… just "aggressive" instead of "passive-aggressive"? Okay then :trollface:
There's a beautiful irony in your comment as it's thus far the only thing that's passive-aggressive
You were obviously reading this on device that doesn't render emojis, there is :kissing_cat: at the end!
And there is a slight, but noticeable difference between being sarcastic and passive-aggressive. (like this dot at the end here or "I did you the service of clarifying.")
😽 (:kissing_cat:)
But seriously speaking, I wasn't able to find upstream.flatMap { inner }.bind() scenario that fits to possible problem described by you and could be fixed by guaranteed emission on inner thread.
Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Most helpful comment
If you bind data, delaying a frame will cause a noticeable UI problem.