Hello there. Consider this observable
Observable.just(1)
.flatMap( i -> {
Timber.d("FLATMAP PRE-1: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
return Observable.create(s -> {
Timber.d("FLATMAP PRE-1-CALL: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
s.onNext(i);
s.onCompleted();
})
.subscribeOn(Schedulers.io())
.flatMap( i -> {
Timber.d("FLATMAP POST-1: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
return Observable.create(s -> {
Timber.d("FLATMAP POST-1-CALL: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
s.onNext(i);
s.onCompleted();
})
.subscribe( i -> {
Timber.d("Subscribe: Thread: %d - %s", Thread.currentThread().getId(), Thread.currentThread().getName());
});
This will emit the following:
FLATMAP PRE-1: 1 - main
FLATMAP PRE-1-CALL: 1 - main
FLATMAP POST-1: 1 - main
FLATMAP POST-1-CALL: 1 - main
Subscribe: Thread: 1 - main
This means that it subscribeOn is "lost" when flatMap is called?
I don't understand this code as it doesn't compile as it is pasted. Here is my attempt to refactor it to work:
import rx.Observable;
import rx.schedulers.Schedulers;
public class FlatMapSubscribeOn {
public static void main(String... args) {
Observable.just(1)
.flatMap(i -> {
Timber.d("FLATMAP PRE-1: %d - %s", Thread.currentThread());
return Observable.create(s -> {
Timber.d("FLATMAP PRE-1-CALL: %d - %s", Thread.currentThread());
s.onNext(i);
s.onCompleted();
});
})
.subscribeOn(Schedulers.io())
.flatMap(i -> {
Timber.d("FLATMAP POST-1: %d - %s", Thread.currentThread());
return Observable.create(s -> {
Timber.d("FLATMAP POST-1-CALL: %d - %s", Thread.currentThread());
s.onNext(i);
s.onCompleted();
});
}).toBlocking().forEach(i -> {
Timber.d("Subscribe: Thread: %d - %s", Thread.currentThread());
});
;
}
public static class Timber {
public static void d(String log, Thread thread) {
System.out.printf(log + "\n", thread.getId(), thread.getName());
}
}
}
This outputs the following:
FLATMAP PRE-1: 12 - RxCachedThreadScheduler-1
FLATMAP PRE-1-CALL: 12 - RxCachedThreadScheduler-1
FLATMAP POST-1: 12 - RxCachedThreadScheduler-1
FLATMAP POST-1-CALL: 12 - RxCachedThreadScheduler-1
Subscribe: Thread: 12 - RxCachedThreadScheduler-1
Exactly what i meant. Isn't it suppoused to run flatMap Observable code within another thread?
so that PRE-1-CALL and possibly (POST-1-CALL) are on another thread?
Did i misunderstand something?
subscribeOn will run the code which perform the subscription into the schedculer given in subscribeOn.
So Observable.just(...) will executed in the io thread pool. And all sub sequent call too, if you don't give another Scheduler using observeOn.
ie:
Observable.just(1) // 1 will be emited in the IO thread pool
.subscribeOn(Schedulers.io())
.flatMap(...) // will be in the IO thread pool
.observeOn(Schedulers.computation())
.flatMap(...) // will be executed in the computation thread pool
.observeOn(AndroidSchedulers.mainThread())
.subscribe(); // will be executed in the Android main thread (if you're running your code on Android)
@dwursteisen thanks for your explanation I found this very helpful
@dwursteisen does it change anything is if I move the .subscribeOn in your example just above the .subscribe call?
It won't change anything.
Le mer. 20 mai 2015 11:11, Andrea Baccega [email protected] a
écrit :
@dwursteisen https://github.com/dwursteisen does it change anything is
if I move the .subscribeOn in your example just above the .subscribe call?—
Reply to this email directly or view it on GitHub
https://github.com/ReactiveX/RxJava/issues/2925#issuecomment-103818922.
Thanks @dwursteisen for the explanation
Important thing to note though, that if Observables inside flatMap would subscribe on some other thread, then these flatMaps would override initial thread. E.g.:
public void main() {
Observable.just(1)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io()) // this is put just for demonstration
.doOnNext(this::printThread) // IO
.flatMap(i -> incrementOnIoThread1(i)) // or incrementOnIoThread2(i)
.subscribe(this::printThread // computation
, throwable -> {});
}
public Observable<Integer> incrementOnIoThread1(int i) {
return Observable.just(i + 1)
.observeOn(Schedulers.computation());
}
public Observable<Integer> incrementOnIoThread2(int i) {
return Observable.just(i + 1)
.subscribeOn(Schedulers.computation());
}
The only thing we can to alleviate this, is to add observeOn after flatMap, as Karnok said here.
Most helpful comment
subscribeOnwill run the code which perform the subscription into the schedculer given insubscribeOn.So
Observable.just(...)will executed in the io thread pool. And all sub sequent call too, if you don't give anotherSchedulerusingobserveOn.ie: