I'm trying to interop Kotlin Flow with RxJava callers using kotlin-coroutines-rx2. All my consumers of the API are currently RxJava chains, which I'm planning to slowly migrate to coroutines. However, my data is exposed as Flow<T> that needs to be consumed by these Rx chains. The data is coming from Jetpack DataStore. I'm using coroutines-rx2 rxSingle to wrap the flow into a Single in order to satisfy the caller dependencies. I end up with a function like this:
override fun getData(): Single<Data> = rxSingle { dataStore.data.first() }
Which gets called from chains like this:
repository.getData()
.flatMap { someNetworkCall(it) }
.subscribeOn(schedulerProvider.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ processResult(it) }, { processError(it) })
The threading of this looks like:
getData() is called on a background thread from Schedulers IOrxSingle doesn't have a coroutine context and therefore the dispatcher defaults to DefaultDispatchersomeNetworkCall now runs on the DefaultDispatcher instead of the IO, which would be expected by just reading the Rx chainIn my app, all the threading is at the top layer, where the subscribe is called. This thread switching that rxSingle does is not obvious to whoever's working with the API, and the thread injected and used in bottom layer where getData exists, might not satisfy the remaining of the Rx chain.
I'm not sure why rxSingle switches the thread context instead of using the one currently specified by the Rx scheduler, there's probably a good reason for it, but as I see it, my options for this are:
rxSingle to wrap a Flow, I can finish with switching the thread using .observeOn(). This doesn't really satisfy the requirement of being transparent to the caller of the function, as I will just default to an arbitrary thread i decide on, which most likely going to be observeOn(schedulers.io()) to be safe. So the same getData will look like override fun getData(): Single<Data> = rxSingle(backgroundDispatcher) { dataStore.data.first() }.observeOn(schedulers.io())repository.getData(schedulers.io().asCoroutineDispatcher()) and pass it to my rxSingle wrapping function: override fun getData(dispatcher: Dispatcher): Single<Data> = rxSingle(dispatcher) { dataStore.data.first() }Both options are not great so I'm looking for some advice, maybe I'm missing something here?
Have you considered explicitly using rxSingle(Dispatchers.Unconfined) in order to let rxSingle continue using the thread provided by the Rx Scheduler?
Interesting. I actually haven't considered that as I initially dismissed that dispatcher thinking that Unconfined refers to unconfined thread pool, i.e. a dispatcher that creates threads as needed instead of using a thread pool. Now that you pointed this out, I read the docs and I can see it's something completely different. This might be exactly the solution I was looking for.
Looks like Dispatchers.Unconfined is an option, but it's less safe than passing in a dispatcher from the top. The problem with the unconfined is that it only works until the first context switch. Once the context is switched, we're back to the problem as described, where the continuation of the chain is on a thread specified somewhere in the depth of the code rather than the top layer where the subscribe is.
The key difference is that Coroutines/Flow are built with context preservation in mind where as Rx is not. For example, in even in a pure Rx stream, if you use operators such as delay(), throttle(), interval(), or even flatMap { someObservable.subscribeOn(differentScheduler) }, I believe Rx will end up switching the downstream to a different scheduler even if you apply subscribeOn(Schedulers.io()) just before to the overall entire stream.
In code that follows some kind of abstraction architecture, e.g., Clean Architecture or a Repository Pattern, where interfaces are programmed against, why should my Presenter/View Model have to know to apply subscribeOn(Schedulers.io()) when subscribing to my Use Case or Repository interface type? What if the repository implementation switches from being File IO based to an in-memory implementation? Does my Presenter or View Model have to know that Schedulers.io() is no longer required?
What I like about Coroutines and its context preservation behavior is that it becomes an implementation detail of the Use Case or Repository as to which Dispatcher the suspend functions run on. The Presenter/View Model can simply "trust" the implementation without having to know about Dispatcher.IO or Dispatchers.Default, for example. By the time the call frame returns back to the Presenter/View Model, Coroutines knows to resume back on the default Dispatcher of the Presenter/View Model's CoroutineScope.
Anyhow, this probably does not directly answer your question, but maybe it clarifies why these coroutines-rx* APIs behave more like Coroutines than Rx. 馃檪
I can totally see your point about the implementation details, and it's partially why I'm undertaking migrating a mature app from Rx to coroutines. But considering how long the migration will take, having a decent option for the in-between state is essential.