Retrofit: Observer with observeOn on Schedulers.currentThread is not called on currentThread

Created on 4 Jan 2014  路  9Comments  路  Source: square/retrofit

I'm new to RxJava so forgive me if I made the wrong assumption.
I assumed from the RxJava documentation that the Observer is called from the thread which is set by the observeOn call of an Observable.

In my case I set the Schedulers as follows:
.subscribeOn(Schedulers.threadPoolForIO()).observeOn(Schedulers.currentThread())

So I was expecting that the onNext is called from the current thread but it appears that onNext is called from the "Retrofit-Idle" thread instead.

It seems that the observeOn is completely ignored.
Is this as designed or is this an error?

Most helpful comment

To answer the JVM question, you need an Executor that executes the runnables back on the calling thread. One solution is posting to a BlockingQueue that's being consumed on the caller's thread. Something like this (kotlin),

  val tasks = LinkedBlockingQueue<Runnable>()

  Observable.fromCallable {"Hello" }
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.from(Executor { runnable -> tasks.add(runnable) }))
    .subscribe { println("Back on the original thread:  " + Thread.currentThread().name) }

  tasks.take().run()

All 9 comments

This is by design. It's your job to observe on whatever thread you want.

fooService.doSomething("Hi!")
  .map({ o -> o.getList() })
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe({ adapter.setList(it); })

This is documented on RestAdapter and on the website:

Observable requests are subscribed asynchronously and observed on the same thread that executed the HTTP request. To observe on a different thread (e.g. Android's main thread) call observeOn(Scheduler) on the returned Observable.

Are you on the JVM? I'm not familiar with the semantics of exactly how Schedulers.currentThread() works since I've never used it.

Retrofit shouldn't exhibit any behaviors that are different from normal RxJava. Internally we are just calling Observable.create(..).observeOn(..) and then returning it.

I dug a litter deeper. If I do not use the Observable return type from a service call and then wrap the response in an Observable, the observeOn and subcribeOn are performed on the correct threads.

If I use the Observable return type from a service call then the onNext is not performed on the expected thread set by the observeOn(Schedulers.currentThread) but on the "Retrofit-idle" thread instead.

Maybe I'm doing something wrong.

Retrofit is behaving correctly, but RxJava can be confusing. Schedulers.currentThread() is badly named and does not do what you think it does. It does not refer to the thread on which you call observeOn(); it just means that no thread-hopping will be performed when the observer is called. I made the same mistake when starting with RxJava myself.

Generally speaking you want subscribeOn(AndroidThreads.mainThread()). If you're in a background thread when you make the call and you want to process the response on that same thread, try AndroidSchedulers.handlerThread(Handler).

Thank you loganj. I did indeed made the same mistake.

@loganj what if you are on JVM and therefore do not have access to AndroidThreads.mainThread() ? Ideally I would like to create a scheduler that basically runs on the thread on which .subscribe is called on

To answer the JVM question, you need an Executor that executes the runnables back on the calling thread. One solution is posting to a BlockingQueue that's being consumed on the caller's thread. Something like this (kotlin),

  val tasks = LinkedBlockingQueue<Runnable>()

  Observable.fromCallable {"Hello" }
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.from(Executor { runnable -> tasks.add(runnable) }))
    .subscribe { println("Back on the original thread:  " + Thread.currentThread().name) }

  tasks.take().run()

@guelo so that just basically blocks the calling thread until, tasks.take() returns which is when the observable emits.

Correct. You need some kind of strategy to allow the thread to cede control and execute work asynchronously posted from another thread. In UI and game code that's usually provided for you as an event loop, which is what AndroidSchedulers.mainThread() hooks into. If you're doing your own asynchronous callbacks you have to come up with your own strategy.

Was this page helpful?
0 / 5 - 0 ratings