Let's say I have the following RxJava-based API client class:
class RxJavaBasedApi {
fun getUser(): Single<User> {
// Underlying implementation details may be Retrofit + RxJava 2
}
}
Now I wish to wrap this in a suspend function like so:
class UserApiClient(private val rxJavaBasedApi) {
suspend fun getUser(): User {
return rxJavaBasedApi.getUser().await()
}
}
Then in an Android ViewModel, let's say I have the following:
class UserViewModel(
private val userApiClient: UserApiClient,
private val dispatchers: Dispatchers // Wrapper Dispatchers instance for DI purposes
) : ViewModel(),
CoroutineScope {
override val coroutineContext: CoroutineContext = Job() + dispatchers.main
init {
launch {
val user = userApiClient.getUser() // NetworkOnMainThreadException
}
}
...
}
When getUser() gets called, the underlying Single will get subscribed which by default operates in a synchronous, blocking manner since there was no subscribeOn()/observerOn() specified downstream.
Although Single.await() is a suspend function, in this particular case, it will block on the current thread in which it was invoked.
Questions:
ViewModel take on the responsibility of having to wrap the call with withContext(dispatchers.io) in the init block?UserApiClient instead take care of the implementation details of wrapping the call within withContext(dispatchers.io)? This avoids having to specify withContext in the ViewModel. If so:Single.await() extension still be used? Does it provide any value within withContext?Single.blockingGet() be used instead? It would be a non-suspend invocation within withContext.Personnally I would use subscribeOn(Schedulers.io()). This is the Rx way of dispatching stuff on a given scheduler.
class RxJavaBasedApi {
fun getUser(): Single<User> =
Single.just(User()) // Replace by underlying implementation details may be Retrofit + RxJava 2
.subscribeOn(Schedulers.io()) // <-- The fact that a specific scheduler is needed is an implementation detail IMHO and therefore should be here.
}
Thanks for your input @jcornaz! I guess I'm so used to deferring the subscribeOn()/observeOn() all the way till the point of subscription:
Example:
mySingle()
.applyOtherOperators()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
// call backs here
)
This gives the subscriber full control of which Schedulers to schedule and observe from.
The other issue is that when building the Retrofit instance, we're using RxJava2CallAdapterFactory.create() as opposed to RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()). Hence, why by default, the Single is invoked synchronously on the calling thread.
I guess for Coroutine newbies like myself, the documentation can be a little ambiguous when it states:
Awaits for completion of the single value without blocking a thread.
And yet it may actually block the thread.
Fortunately https://medium.com/@elizarov/blocking-threads-suspending-coroutines-d33e11bf4761 helps clarify things a bit. 馃檪
The problem arises from the fact, that you have to control two concurrency levels: one from Rx and one from coroutines. Yes, you can use subscribeOn, but if you already opted-in in coroutines-based API (by making getUser suspendable), it is better to use withContext.
Should the ViewModel take on the responsibility of having to wrap the call with withContext(dispatchers.io) in the init block?
Better not to. If your API is already suspendable and IO-related, it should use withContext(IO) on its own. Scheduler and dispatcher are an implementation detail and should be encapsulated in the service itself (UserApiClient). In that case, it could provide a stable contract ("use me from everywhere"), be easily mocked in tests no matter in what context it is used and its context will be configurable in a single place (UserApiClient constructor)
Thanks for your reply @qwwdfsad. That is precisely what I am doing. The only other question I have is which would be considered more favorable?
class UserApiClient(private val rxJavaBasedApi) {
suspend fun getUser(): User = withContext(Dispatchers.IO) {
rxJavaBasedApi.getUser().await() // suspend extension function on Single
}
}
or
class UserApiClient(private val rxJavaBasedApi) {
suspend fun getUser(): User = withContext(Dispatchers.IO) {
rxJavaBasedApi.getUser().blockingGet() // http://reactivex.io/RxJava/javadoc/io/reactivex/Single.html#blockingGet--
}
}
I'm currently still using the await() suspend function on Single, but I'm not sure I see the added benefit over blockingGet() if both are still running within withContext(Dispatchers.IO). 馃
Both are fine, though I'm biased towards .await just to make it idiomatic :)
Don't you mind if I close the issue?
Not at all. My question has been answered. Please feel free to close. Thank you all 馃檪