Hi, I am using RxJava 2.1.1. I thought I knew how subscribeOn works, but sometimes I still have some doubts about it.
As far as I know, for subscribeOn it doesn't metter the position where you use it (in the chain).
Assuming that I am executing this code from my MAIN THREAD.
Observable.just("")
.doOnSubscribe(/* NEW THREAD */)
.subscribeOn(Schedulers.newThread())
.subscribe(/* NEW THREAD */);
As expected the doOnSubscribe() is called on the NEW THREAD, because of subscribeOn(Schedulers.newThread())
But, if I call subscribeOn before doOnSubscribe the result will be different:
Observable.just("")
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(/* MAIN THREAD */)
.subscribe(/* NEW THREAD */);
Now the doOnSubscirbe() seems to be executed in the MAIN THREAD, regardless of subscribeOn(Schedulers.newThread()).
Why is this happening?
subscribeOn moves the subscription side-effects of the upstream to another thread but doesn't affect the downstream's subscription side-effects. Therefore, it matters were you put it if you have subscription side-effecting operators before it.
Let's walk through what happens in the first case. The chain consist of Just -> DoOnSubscribe -> SubscribeOn -> LambdaObserver operators but the subscription process walks in reverse:
| Step | Thread | Action |
|----|----|---|
| 1 | main | Creation of LambdaObserver |
| 2 | main | SubscribeOn.subscribe(LambdaObserver) |
| 3 | main | SubscribeOn calls onSubscribe on LambdaObserver |
| 4 | main | SubscribeOn schedules a subscribe action on newThread |
| 5 | newthread | DoOnSubscribe.subscribe(SubscribeOnObserver) |
| 6 | newthread | Just.subscribe(DoOnSubscribeObserver) |
| 7 | newthread | Just calls onSubscribe on DoOnSubscribeObserver |
| 8 | newthread | DoOnSubscribe calls the lambda and then onSubscribe on SubscribeOnObserver |
| 9 | newthread | Since SubscribeOnObserver already called onSubscribe, the call returns to Just |
| 10 | newthread | Just emits onNext |
Now if doOnSubscribe is after subscribeOn:
| Step | Thread | Action |
|----|----|---|
| 1 | main | Creation of LambdaObserver |
| 2 | main | DoOnSubscribe.subscribe(SubscribeOnObserver) |
| 3 | main | SubscribeOn.subscribe(DoOnSubscribeObserver) |
| 4 | main | SubscribeOn calls onSubscribe on DoOnSubscribeObserver |
| 5 | main | DoOnSubscribe calls the lambda and then onSubscribe on LambdaObserver |
| 6 | main | SubscribeOn schedules a subscribe action on newThread |
| 7 | newthread | Just.subscribe(SubscribeOnObserver) |
| 8 | newthread | Just calls onSubscribe on SubscribeOnObserver |
| 9 | newthread | Since SubscribeOnObserver already called onSubscribe, the call returns to Just |
| 10 | newthread | Just emits onNext |
That's a perfect explanation. Thank you very much.
Looks like this question has been answered. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.
Hi @akarnokd,
I too had doubts on the ordering of subscribeOn when there are multiple side effects and chains (using a flatMap) involved, I wanted to confirm if I got the logic correctly. Your help would be appreciated! I've tried to hopefully keep the code understandable.
In the below code, the general requirement is that any calls to readingDao should occur on rxSchedulers.database; and any calls to api should occur on rxSchedulers.network:
Single.fromCallable { readingDao.getNextUploadBatch() }
// Switch to the database thread, because of the Timber log of readingDao.getCount()
.subscribeOn(rxSchedulers.database)
// Run the 2nd task using the 1st task's result
.flatMapCompletable {
readings = it
// If readings is empty, then finish the job, and run an empty Completable
// so that it directly jumps to onComplete
if (readings.isEmpty()) {
Timber.d("Actual readings count in DB = ${readingDao.getCount()}")
jobFinished(job, false)
// Return a no-action Completable that immediately completes
Completable.complete()
}
// Else, upload the readings; run it on the network thread
else {
api.uploadSensorReadings(readings)
// Switch to the database thread for the below side effect tasks
.subscribeOn(rxSchedulers.database)
.doFinally {
jobFinished(job, !readingDao.isEmpty())
}
.doOnComplete {
readingDao.delete(*readings.toTypedArray())
}
// Switch to the network thread for the Completable API call
.subscribeOn(rxSchedulers.network)
}
}
// Run the Single on the database thread
.subscribeOn(rxSchedulers.database)
// Execute the below observer code on the database thread again
.observeOn(rxSchedulers.database)
.subscribe(
{ /* Already handled in doOnComplete */ },
{
if (readings.isNotEmpty()) {
readingDao.markCurrentReadingsAsNotUploading()
}
}
)
The original question (regarding conditionally chaining a Single and a Completable) was answered by you on this StackOverflow question.
Thanks for all your help in assisting me to understand Rx!
Daksh.
@dakshj Does it work as you expected? -> You are done! Does it behave unexpectedly? -> Isolate the problematic part and post a new question on StackOverflow.
It is working as expected! However I'm not sure how to programmatically check what the current Scheduler is. So, I wanted to run it by you if I indeed have understood the logic correctly!
I've also posted a StackOverflow question here. Thanks!
Most helpful comment
subscribeOnmoves the subscription side-effects of the upstream to another thread but doesn't affect the downstream's subscription side-effects. Therefore, it matters were you put it if you have subscription side-effecting operators before it.Let's walk through what happens in the first case. The chain consist of Just -> DoOnSubscribe -> SubscribeOn -> LambdaObserver operators but the subscription process walks in reverse:
| Step | Thread | Action |
|----|----|---|
| 1 | main | Creation of
LambdaObserver|| 2 | main |
SubscribeOn.subscribe(LambdaObserver)|| 3 | main |
SubscribeOncallsonSubscribeonLambdaObserver|| 4 | main |
SubscribeOnschedules a subscribe action onnewThread|| 5 | newthread |
DoOnSubscribe.subscribe(SubscribeOnObserver)|| 6 | newthread |
Just.subscribe(DoOnSubscribeObserver)|| 7 | newthread |
JustcallsonSubscribeonDoOnSubscribeObserver|| 8 | newthread |
DoOnSubscribecalls the lambda and thenonSubscribeonSubscribeOnObserver|| 9 | newthread | Since
SubscribeOnObserveralready calledonSubscribe, the call returns toJust|| 10 | newthread |
JustemitsonNext|Now if
doOnSubscribeis aftersubscribeOn:| Step | Thread | Action |
|----|----|---|
| 1 | main | Creation of
LambdaObserver|| 2 | main |
DoOnSubscribe.subscribe(SubscribeOnObserver)|| 3 | main |
SubscribeOn.subscribe(DoOnSubscribeObserver)|| 4 | main |
SubscribeOncallsonSubscribeonDoOnSubscribeObserver|| 5 | main |
DoOnSubscribecalls the lambda and thenonSubscribeonLambdaObserver|| 6 | main |
SubscribeOnschedules a subscribe action onnewThread|| 7 | newthread |
Just.subscribe(SubscribeOnObserver)|| 8 | newthread |
JustcallsonSubscribeonSubscribeOnObserver|| 9 | newthread | Since
SubscribeOnObserveralready calledonSubscribe, the call returns toJust|| 10 | newthread |
JustemitsonNext|