Kotlinx.coroutines: Customizable coroutine behaviour on scope termination

Created on 2 Apr 2019  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

Currently this library allows to define how a coroutine have to start (using start parameter) and its dependency fail behavior (using jobs).
According to structured concurrency it assumes that all sub-coroutines have to join on scope termination.

In my experience this decision does not fit all use case, for some async or produce it is preferable to cancel a coroutine instead of join.
So I wish to consider a parameter to customize the coroutine's behavior on scope termination.

This proposal does not conflict with structured concurrency.

In the example code

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }

    if (secret != null) {
        require(secret == userDeferred.await().secret)
    }

the strusctured concurrency impose the implicit join of all children.

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.join()
    }

So the try-finally is implicit, but unfortunately the first example hangs indefinitely, moreover we MUST write the code

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.cancel()
    }

Structured concurrency should avoid to handle join manually.

Proposal

My proposal is to consider a parameter to customize the behavior on scope termination, ie: default, join, cancel.

Ie:

    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.CANCEL) { loadUser() }
    try {
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
    } finally {
        userDeferred.join()
    }

Alternatives

This issue was already solved for ReceiveChannel using the consume function, we can copy the same block for Deferred.

inline fun <E, R> Deferred<E>.consume(block: (Deferred<E>) -> R): R {
    var cause: Throwable? = null
    try {
        return block(this)
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        @Suppress("DEPRECATION")
        cancel(cause)
    }
}

So the first example becomes:

    val secret: String? = null
    async(start = CoroutineStart.LAZY) { loadUser() }.consume { userDeferred ->
        if (secret != null) {
            require(secret == userDeferred.await().secret)
        }
        userDeferred.cancel()
    }

We have to introduce a nested block, similar to use function that mimic the try-with-resource block.

Future improvements:

Introducing a new termination behavior allow us to define a similar API to avoid the nested block

fun CoroutineScope.cancelOnScopeTermination(job: Job) {
    launch(start = CoroutineStart.LAZY, onScopeTermination = ScopeTermination.JOIN) {
        job.cancel()
    }
}
    val secret: String? = null
    val userDeferred = async(start = CoroutineStart.LAZY) { loadUser() }
    cancelOnScopeTermination(userDeferred)

    if (secret != null) {
        require(secret == userDeferred.await().secret)
    }

This requires an extra allocation and miss of exception handling, but I consider scope finalizers a future enhancement.

What do you think about?

design

Most helpful comment

However, I see the problem here. You want to cancel your async on scope exit even if it was already _started_. This is indeed a use-case for cancellation on scope exit that is orthogonal to whether it is lazy or not, already started or not.

All 5 comments

Analogously with structured programming, this library deals with common common pattern.

Function invocation become async/await
Object message passing is pending #87
delegation in unplanned KT-20414
when become a select
for loop is pending #328
collection operators is pending #1022

try-catch-finally looks pretty uncovered to me.
The suspending version do not work like the structured one.

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    println("Start")
    runBlocking {
        try {
            coroutineScope {
                launch { TODO() }
            }
        } catch (e: Exception) {
            e.printStackTrace()
        } finally {
            launch { println("Done!") }
        }
    }
    println("End")
}

Thanks for this write-up. This is an interesting use-case explaining the problem of scope termination for lazy coroutines.

I don't like the solution with onScopeTermination parameter because it does not ready scale do different coroutine builder that might potentially need the same handling cancelOnScopeTermination solution looks more promising, but it suffers from the non-atomicity, e.g. if something goes wrong between async and onScopeTermination it still hangs.

I actually think that we can simply adjust existing behavior of lazy coroutines on scope termination, that is make cancellation of non-started lazy coroutines on scope termination a _default_ behavior and do not provide any other configuration. You've showed a good use-case for a combination of CoroutineStart.LAZY and ScopeTermination.CANCEL but

  • Are there any use-cases for CoroutineStart.DEFAULT and ScopeTermination.CANCEL. Do we need to support it?
  • Are there any use-cases for CoroutineStart.LAZY and some other approach to non-started coroutines, but cancelling them?

@elizarov

Are there any use-cases for CoroutineStart.DEFAULT and ScopeTermination.CANCEL. Do we need to support it?

The following code is production code, we have to get a value from a Channel or request a fresh value.

val value = channel.poll() ?: run {
    val requestDeferred = async { requestValue() }

    try {
        select<Int> {
            channel.onReceive { it }
            requestDeferred.onAwait { it }
        }
    } finally {
        requestDeferred.cancel()
    }
}

However paying some extra allocations it is possible to rewrite it using lazy async.

making it async(start = LAZY) does not do any extra allocations.

However, I see the problem here. You want to cancel your async on scope exit even if it was already _started_. This is indeed a use-case for cancellation on scope exit that is orthogonal to whether it is lazy or not, already started or not.

Was this page helpful?
0 / 5 - 0 ratings