Kotlinx.coroutines: [Question] How to move from RxJava to Kotlin coroutines - simple presenter example

Created on 24 Jul 2019  路  1Comment  路  Source: Kotlin/kotlinx.coroutines

I have problem to rewrite logic for presenter method which could be rerun every time from UI. What i mean is that I want to cancel currently running job and rerun it again. Its very simple example, but I hope you got my point:

    private val disposables = CompositeDisposable()
    private val allDisposables = CompositeDisposable()
    private val someBooleans = PublishProcessor.create<Boolean>()

    fun repeatableAndCancelableTask() {
        //clear current work
        disposables.clear()
        //hard, hard work on observing data
        Observable
                .just(true)
                .mergeWith(someBooleans.toObservable()
                        .flatMap {
                            Observable.just(it).delay(10, TimeUnit.SECONDS)
                        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({
                    print("great: $it")
                }, {
                    print("not great: $it")
                })
                .addTo(disposables)
                .addTo(allDisposables)
    }

    fun clear(){
        allDisposables.clear()
    }

Simple - get some current state, and merge them with possible updates and then observe till its not disposed by rerunning same method again or by clear() method.

someBooleans is for possible updates from any source.

And my question is, how same behaviour could be done with coroutines? I was trying to work with combination of RxJava merge and converting it by asFlow but i run into a trouble of disposing / cancelling flow job and re rerunning it again with combination of mainScope and im stuck.

I didn't find any similar case anywhere in web.
Could anyone propose solution with coroutines ?
Thanks in advance!!!

question

Most helpful comment

Hi, a more or less rough equivalent will be the following (assuming you already have MainScope somewhere):

class Presenter(private val uiScope: CoroutineScope) { // ui scope with main dispatcher, cancelled by presenter owner instead of calling "clear"

    private var previouslyLaunchedWork: Job? = null

    fun repeatableAndCancelableTask() {
        previouslyLaunchedWork?.cancel() // maybe join it as well
        previouslyLaunchedWork = someBooleans
            .asFlow()
            .delayEach(10s)
            .onStart { emit(true) }
            .flowOn(Dispatchers.IO)
            .onEach { "great: $it" }
            .catch { "not great: $it" }
            .launchIn(uiScope)
    }
}

When uiScope is cancelled, all its children (aka implicit "allDisposables") will be cancelled as well.
Though maintaining a mutable state (previouslyLaunchedWork) is not always the best solution, for example, it can be rewritten via switchMap (in both RxJava and Flow)

>All comments

Hi, a more or less rough equivalent will be the following (assuming you already have MainScope somewhere):

class Presenter(private val uiScope: CoroutineScope) { // ui scope with main dispatcher, cancelled by presenter owner instead of calling "clear"

    private var previouslyLaunchedWork: Job? = null

    fun repeatableAndCancelableTask() {
        previouslyLaunchedWork?.cancel() // maybe join it as well
        previouslyLaunchedWork = someBooleans
            .asFlow()
            .delayEach(10s)
            .onStart { emit(true) }
            .flowOn(Dispatchers.IO)
            .onEach { "great: $it" }
            .catch { "not great: $it" }
            .launchIn(uiScope)
    }
}

When uiScope is cancelled, all its children (aka implicit "allDisposables") will be cancelled as well.
Though maintaining a mutable state (previouslyLaunchedWork) is not always the best solution, for example, it can be rewritten via switchMap (in both RxJava and Flow)

Was this page helpful?
0 / 5 - 0 ratings