Currently there is a way to create a CoroutineDispatcher from a Scheduler (RxJava) which is great but it would also be convenient to be able to do the reverse. RxJava allows you to set your own schedulers in the Schedulers singleton meaning it could be overridden for the whole system.
Here is a sample implementation:
class DispatcherBackedScheduler(private val dispatcher: CoroutineDispatcher) : Scheduler() {
override fun createWorker(): Worker = DispatcherBackedWorker(dispatcher)
private class DispatcherBackedWorker(private val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext = EmptyCoroutineContext + job
override fun isDisposed(): Boolean = !job.isActive
override fun dispose() = job.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable =
if (delay <= 0) {
dispatcher.dispatch(coroutineContext, run)
Disposables.disposed()
} else {
launch(dispatcher) {
kotlinx.coroutines.delay(unit.toMillis(delay))
run.run()
}.asDisposable()
}
}
}
private fun Job.asDisposable(): Disposable = object : Disposable {
override fun isDisposed(): Boolean = !isActive
override fun dispose() = cancel()
}
@ansman
Should CoroutineScope.asScheduler like below?
private fun Job.asDisposable(): Disposable = object : Disposable {
override fun isDisposed(): Boolean = !isActive
override fun dispose() = cancel()
}
fun CoroutineScope.asScheduler(): Scheduler {
return ScopeBackedScheduler(this)
}
class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
override fun createWorker(): Worker = ScopeBackedWorker(scope)
private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
override fun isDisposed(): Boolean = !scope.isActive
override fun dispose() = scope.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (isDisposed) {
return Disposables.disposed()
}
val job = scope.launch {
if (delay > 0) {
delay(unit.toMillis(delay))
}
run.run()
}
return job.asDisposable()
}
}
}
I just want the lifecycle of CoroutineScopeScheduler as long as Activity’s like below:
class MainActivity : AppCompatActivity(), CoroutineScope {
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
private lateinit var job: Job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
RxView.onClick(btnABC).
.observeOn(this.asScheduler()) // The stream will be completed when the `job.cancel()` was called in `onDestroy()`
.subscribe(...)
}
override fun onDestroy() {
job.cancel()
super.onDestroy()
}
}
@ansman
The kotlinx-coroutines-rx2 is interesting.
And it expand some excellent feature (new Backpressure strategy), so we can use it directly without cusstom a CoroutineDispatcherScheduler!
But keep in mind on the production env.
@Guang1234567 Your implementation of ScopeBackedScheduler has several problems:
• If can only handle a single dispatched job at the same time
• A worker has a shorter lifespan than a scope so you can't cancel the scope when disposing the worker
Also, you never dispose your click observer.
@ansman
Thanks your reply.
I just fix the problem under your suggestion like:
fun CoroutineScope.asScheduler(): Scheduler {
return ScopeBackedScheduler(this)
}
class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
override fun createWorker(): Worker = ScopeBackedWorker(scope)
private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
private val job = SupervisorJob(scope.coroutineContext[Job])
override fun isDisposed(): Boolean = !job.isActive
override fun dispose() = job.cancel()
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (isDisposed) {
return Disposables.disposed()
}
return scope.launch(job) {
if (delay > 0) {
delay(unit.toMillis(delay))
}
run.run()
}.asDisposable()
}
}
}
It solve several problems:
Job’s lifespan as long as Worker. Because it is the child job of the scope, so we can cacnel the job instead of cancel the scope when disposing the worker.Are there any intentions to get this finished?
I'm happy to provide a pull request as long as someone from the Coroutines team approves the idea
I'm happy to provide a pull request as long as someone from the Coroutines team approves the idea
There is a PR #1923 in review, close to being ready to merge.
Most helpful comment
There is a PR #1923 in review, close to being ready to merge.