Let's say I have the following RxJava-based code:
fun loadUser(): Single<User> {
// load the user
}
fun observeChanges() {
myColdObservable
.flatMapSingle { loadUser() }
.subscribeOn(schedulers.io())
.observeOn(schedulers.mainThread())
.subscribeBy(
onNext = { /* Update UI */ }
)
.addTo(compositeDisposable)
}
And now I wish to change loadUser() to:
suspend fun loadUser(): User {
// load the user
}
Let's just say the code base relies heavily on RxJava for reactive streams but wishes to convert all Completables, Maybes, and Singles, to suspend functions. Is there a recommended way to begin introducing these suspend functions while still being able to rely on the existing Observable streams such as the example above?
I've looked at https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/index.html, however these extensions require a CoroutineScope due to structured concurrency. Would this perhaps be an acceptable case to use runBlocking?
Recently I've came across the same problem, currently I'm using this extension, basically wrapping the coroutine in a runBlocking and assuring it will run on a background thread:
fun <T : Any, R : Any> Flowable<T>.suspendMap(transform: suspend (T) -> R): Flowable<R> {
return this
.map { item -> runBlocking { transform(item) } }
.subscribeOn(Schedulers.computation())
}
But I also really would like to know what would be the recommended way of doing so.
I would recommend to do the following steps:
CoroutineScope instead of compositeDisposable to keep track of "operations in prograss:val scope = MainScope()
You need to cancel the scope (scope.cancel()) at the same place where you'd dispose your composite.
You can start changing your use-site to coroutines first:
fun observeChanges() {
scope.launch {
/** whatever your also code was up to .observeOn(schedulers.mainThread()) */
.consumeEach { /* Update UI */ }
}
}
suspend fun loadUser(): User {
// load the user
}
You might also find Christina Lee's talk "Coroutines by Example" useful: https://www.youtube.com/watch?v=lh2Vqt4DpHU
Does it help?
You can also take advantage of already provided kotlinx-coroutines-rx2 library so you don't need to bother with creating scopes every time:
myObservable.flatMap {
GlobalScope.publish {
send(loadUser())
}
}
I don't have the code with me, but above can easily be converted to inline method, so you can consume it like this:
```
myObservable.flatMapSingleSuspend { loadUser() }
````
I think use of GlobalScope is perfectly fine here, since coroutine will be cancelled anyway whenever Rx unsubscribes.
Thank you all for your input! Much appreciated.
@elizarov your suggestion to wrap the stream within a Coroutine seems like a great approach for lifecycle aware components such as a presenter or view model where the subscription is actually happening. I will most likely follow this approach for such cases.
However, sometimes a separate class may return an Observable which internally may require other dependencies for composing the underlying stream.
Here is another example:
class GetCurrentUser {
// Wish to convert this into a suspend function.
operator fun invoke(): Single<User> {
// Return current user
}
}
class FriendRepository {
fun observeFriendsForUserId(userId: String): Observable<List<Friend>> {
// Perform database query from Room or SqlDelight
// that returns Observable.
}
}
class ObserveCurrentUserFriends(
private val getCurrentUser: GetCurrentUser,
private val friendRepository: FriendRepository
) {
operator fun invoke(): Observable<List<Friend>> {
return getCurrentUser()
.map { it.id }
.flatMapObservable {
friendRepository.observeFriendsForUserId(userId = it)
}
}
}
class FriendsPresenter(
private val observeCurrentUserFriends: ObserveCurrentUserFriends,
private val schedulers: Schedulers
) {
internal lateinit var target: FriendsTarget // Set externally
private val compositeDisposable = CompositeDisposable()
fun onTakeTarget() {
observeCurrentUserFriends()
.subscribeOn(schedulers.io())
.observeOn(schedulers.mainThread())
.subscribeBy(
onNext = { target.setFriends(friends = it) },
onError = { target.errorLoadingFriends() }
)
.addTo(compositeDisposable)
}
fun onDropTarget() {
compositeDisposable.clear()
}
}
Based on your suggestion, I now know how to refactor FriendsPresenter, but if GetCurrentUser is ultimately the class I am trying to convert to a suspend function, how would we get it to play nicely with ObserveCurrentUserFriends? I feel like this is where the suggestions from @icarohs7 and @matejdro might come into play since ObserveCurrentUserFriends is not CoroutineScope aware. Thanks again!
If you converted GetCurrentUser like this:
class GetCurrentUser {
// A suspend function.
operator suspend fun invoke(): User {
// Return current user
}
}
then you can still use it in your Rx code using GlobalScope.rxSingle { getCurrentUser() }, which returns Single<User> as it did before.
.map { item -> runBlocking { transform(item) } }
I do not recommend doing asynchronous operations within runBlocking in rx operators if you are not using Schedulers.io() or have precise control over the amount of in-flight operations.
As Roman noticed, it is better to use rx coroutines builders.
If you want to use getCurrentUser as map transformation, you can use flatMap with rxSingle:
getUserIdObservable()
.flatMapSingle { id -> GlobalScope.rxSingle { getCurrentUser(id) } } // Better to introduce scope field
Thanks again. Just one more question.
Why would
.flatMapSingle { GlobalScope.rxSingle { getCurrentUser() } }
be preferred over
.flatMapSingle {
runBlocking {
rxSingle { getCurrentUser() }
}
}
In the second case, the runBlocking block is going to immediately return a Single which will not perform any work until subscription. Aren't both approaches more or less the same?
Also, I just realized that the suggestion to wrap the entire Observable stream along with the consumeEach within a scope.launch block where the scope is MainScope, would technically delay the point of subscription to the Observable by enqueueing that action onto the Main Looper. I came to realize that after reading https://medium.com/@trionkidnapper/launching-a-kotlin-coroutine-for-immediate-execution-on-the-main-thread-8555e701163b.
I believe the scope would have to use Dispatchers.Main.immediate in order to more or less preserve the existing functionality.
Heads up, when trying the recommendation to wrap the Observable within a coroutine while using consumeEach, I discovered https://github.com/Kotlin/kotlinx.coroutines/issues/1008. Hence, I'll hold off until there is a fix.
Why would ... be preferred over ...?
.flatMapSingle {
runBlocking {
rxSingle { getCurrentUser() }
}
}
just does not work as expected and may unconditionally throw CancellationException.
Try to run this code in order to observe it :)
runBlocking may complete before someone will subscribe on single
It is both misunderstandings of runBlocking and misdesign of our late binding mechanism:
runBlocking is intended to be used as "block current thread until its body and all launched coroutines complete". But rxSingle doesn't launch anything, it postpones actual launching until subscription, thus letting runBlocking to complete prematurely.
While it sounds complicated, you can just use the simple rule "runBlocking blocks the thread and rx operators should not block".
Also, I just realized that the suggestion to wrap the entire
Observablestream along with theconsumeEachwithin ascope.launchblock where the scope isMainScope, would technically delay the point of subscription to theObservableby enqueueing that action onto the Main Looper . I believe the scope would have to useDispatchers.Main.immediatein order to more or less preserve the existing functionality.
Yes. But you can overwrite scope dispatcher if you have to:
myMainScope.rxSingle(Dispatchers.Main.immediate) { // <- may be e.g. Dispatchers.Default as well
I discovered #1008. Hence, I'll hold off until there is a fix.
We will fix it in 1.2. If this is a real blocker for you, you can use you own consumeEach:
suspend inline fun <T> ObservableSource<T>.consumeEach(action: (T) -> Unit) = openSubscription().consumeEach(action)
Try to run this code in order to observe it :)
runBlockingmay complete before someone will subscribe onsingle
You're right as exhibited by the following tests:
@Test
fun singleWithRunBlocking() {
runTest {
runBlocking {
rxSingle {
prefix(number = it)
}
}
}
}
@Test
fun singleWithGlobalScope() {
runTest {
GlobalScope.rxSingle {
prefix(number = it)
}
}
}
private fun runTest(singleProducer: (Int) -> Single<String>) {
println("Start test")
val numbers = Observable.just(1, 2, 3, 4, 5)
numbers
.concatMapSingle { number -> singleProducer(number) }
.subscribe(
{ result -> println(result) },
{ error -> println(error) }
)
Thread.sleep(7000L)
println("End test")
}
private suspend fun prefix(number: Int): String {
delay(1000L)
return "Number: $number"
}
The first test results in:
Start test
kotlinx.coroutines.JobCancellationException: Parent job is Completed; job="coroutine#1":BlockingCoroutine{Completed}@7489362d
End test
And the second test results in:
Start test
Number: 1
Number: 2
Number: 3
Number: 4
Number: 5
End test
We will fix it in 1.2. If this is a real blocker for you, you can use you own
Not a blocker at the moment. Migrating Singles, Maybes, and Completables to suspend functions is more of a nice-to-have effort at the moment. Glad the fix is already in the works! :)
Thank you all for your suggestions and guidance. This was super insightful. Feel free to close if you wish. Hopefully this will help clear some things up for others who are wishing to migrate some of their Rx code to coroutines. 馃檪
Most helpful comment
Recently I've came across the same problem, currently I'm using this extension, basically wrapping the coroutine in a runBlocking and assuring it will run on a background thread:
But I also really would like to know what would be the recommended way of doing so.