kotlin version: 1.3.61
kotlinx.coroutines version: 1.3.3
Hi, I found that PublisherAsFlow ignores the CoroutineContext when created. It always uses an EmptyCoroutineContext instead.
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
// Just ignore the context, use EmptyCoroutineContext for ChannelFlow
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(publisher, capacity)
...
}
So if I call flowOn on it, nothing happens.
Flowable.create<Int>({
... // run on ui thread, io thread expected!!!
}, ...)
.asFlow()
.flowOn(Dispatchers.IO) // will be ignored!!!
.launchIn(mainScope)
I don't think it's possible for the flow dispatcher to be used to affect the thread that an Rx stream is subscribed on, at least not as a reasonable default. PublisherAsFlow just subscribes to a stream by forwarding all emissions to a channel. The only way to do what you're trying to do is to use Rx APIs to configure the scheduler.
If you really need to, you could write an operator that wraps asFlow and creates a Scheduler that delegates to the coroutine dispatcher, but that feels dangerously "magical" and it would be much simpler to just call subscribeOn explicitly.
I can use rx schedulers to switch threads. But if I get a flow from a API which converted from a Flowable inside, and just want to make it flowOn a specific dispatcher, it won't act as expected.
If it is forbidden to call on flowOn on a PublisherAsFlow instance to switch threads, just throw a IllegalStateException to make it clear.
How would the library detect that? The implementation of flowOn would have to check if the type of the upstream Flow is PublisherAsFlow, but that type is defined in a different module that the core library doesn't depend on. And even if you could check the type, it would only catch the simplest case where you call flowOn immediately after asFlow(), and would allow intermediate operators (e.g. .asFlow().map { it.toString() }.flowOn()).
That said, this isn't really a problem with the reactive integration – the same thing comes up if you call flowOn() twice in a row since the second one won't do anything.
I see your point though, it just seems like a really hard problem to solve in a general way. I would expect good libraries to allow you to configure the schedulers being used if it's hiding the Rx types from you.
A simple implementation of an operator that sets the subscription scheduler based on the context might look like this:
@UseExperimental(ExperimentalCoroutinesApi::class)
fun <T : Any> Flowable<T>.asFlowWithScheduler(): Flow<T> = flow {
val flowable = this@asFlowWithScheduler
val upstream = (coroutineContext[ContinuationInterceptor] as? CoroutineDispatcher)
// RxJava analog to Unconfined is to just not specify a scheduler.
?.takeUnless { it == Dispatchers.Unconfined }
?.let { dispatcher -> Schedulers.from(dispatcher.asExecutor()) }
?.let { scheduler -> flowable.subscribeOn(scheduler) }
?: flowable
emitAll(upstream.asFlow())
}
Rx stuffs are called in the upstream of PublisherAsFlow, so the change of the calling upsteam will make effects to the called Rx stuffs, right?
I just wonder why PublisherAsFlow ignores the context passed in from flowOn? If it accept that context, it can affect the upstream by changing the thread of subscribing the upstream.
But it just ignore the context and use EmptyCoroutineContext instead. See the source code here.
private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(publisher, capacity)
...
}
When we call flowOn, it will invoke create to simply create a new Flow instance without update the context. That's weird. It is really confusing to call flowOn on a PublisherAsFlow.
The solution seems to simply add a parameter context: CoroutineContext to PublisherAsFlow,
context to initialize the super class ChannelFlowcreate called, pass the context into the constructor of PublisherAsFlowprivate class PublisherAsFlow<T : Any>(
context: CoroutineContext,
private val publisher: Publisher<T>,
capacity: Int
) : ChannelFlow<T>(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
PublisherAsFlow(context, publisher, capacity)
...
}
Maybe I could have a try later on.
Good catch. The fix would not so trivial, though. It is way more complicated than that.
As a work-around, you can use the following trick to convert Dispatchers.IO (or any other dispatcher for that matter) to Rx Scheduler: Schedulers.fromExecutor(Dispatchers.IO.asExecutor()), so then you can write your code in the following way:
Flowable.create<Int>({
... // runs on IO thread now!
}, ...)
.subsribeOn(Schedulers.fromExecutor(Dispatchers.IO.asExecutor())) // <--- !!!
.asFlow()
.launchIn(mainScope)
Most helpful comment
Good catch. The fix would not so trivial, though. It is way more complicated than that.
As a work-around, you can use the following trick to convert
Dispatchers.IO(or any other dispatcher for that matter) to Rx Scheduler:Schedulers.fromExecutor(Dispatchers.IO.asExecutor()), so then you can write your code in the following way: