Kotlinx.coroutines: `awaitFirst` function

Created on 5 Jul 2018  路  15Comments  路  Source: Kotlin/kotlinx.coroutines

Core library misses of an Promise.race equivalent function.

I propose a little POC of joinFirst/awaitFirst, or a more generic selectFirst.

What do you think?

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.selects.SelectClause0
import kotlinx.coroutines.experimental.selects.SelectClause1
import kotlinx.coroutines.experimental.selects.select
import kotlin.reflect.KProperty1

fun main(args: Array<String>) = runBlocking {
    val jobs = listOf(launch { delay(3) }, launch { delay(4) }, launch { delay(5) })
    jobs.selectFirst(Job::onJoin) {
        println("$it done")
    }

    val asyncs = listOf(async { 3 }, async { 4 }, async { 5 })
    asyncs.selectFirst(Deferred<Int>::onAwait) { deferred, value ->
        println("$deferred done with result $value")
    }

    println("First result is " + asyncs.awaitFirst())
}

// common functions

suspend fun <E : Job> Iterable<E>.joinFirst(): E = select {
    for (job in this@joinFirst) {
        job.onJoin { job }
    }
}

suspend fun <E : Deferred<R>, R> Iterable<E>.awaitFirst(): R = joinFirst().getCompleted()

// generic funtions

suspend fun <E, R> Iterable<E>.selectFirst(selectProperty: KProperty1<E, SelectClause0>, block: (E) -> R): R =
        select {
            for (task in this@selectFirst) {
                selectProperty.get(task).invoke { block(task) }
            }
        }

suspend fun <E, V, R> Iterable<E>.selectFirst(selectProperty: KProperty1<E, SelectClause1<V>>, block: (E, V) -> R): R =
        select {
            for (task in this@selectFirst) {
                selectProperty.get(task).invoke { block(task, it) }
            }
        }
use-case needed

Most helpful comment

@SolomonSun2010 We've designed Kotlin coroutines for more ergonomics. In Kotlin these examples look like this:

for (request in requestServer) {
    handleRequest(request)
}

It is cleaner and with less boiler-plate in Kotlin (no need to explicitly write await in Kotlin if you use suspending functions and channels). As an added bonus, it is not hardcoded into the language (unlike Dart), but is implemented in a library, so we can have even more variety of useful constructs without having to release an update to the language. I welcome you to study coroutines guide to learn about other features we offer.

All 15 comments

POC combined with #410

fun main(args: Array<String>) = runBlocking {
    val result = job {
        val fetch1 = fork { longRunJob(1) }
        val fetch2 = fork { longRunJob(2) }
        listOf(fetch1, fetch2).awaitFirst()
    }

    println("The winner is $result")
}

suspend fun <R> job(parent: Job? = null, block: suspend Job.() -> R): R {
    val scope = Job(parent)
    try {
        return scope.block()
    } finally {
        scope.cancel()
    }
}

suspend fun <R> Job.fork(block: suspend () -> R): Deferred<R> = async(parent = this) { block() }

suspend fun longRunJob(i: Int): Int = i // TODO

I think it lacks use-cases and is not so reliable: who will be responsible for checking the rest of jobs for exceptions or cancellations? Value of such extension in the real-world application is unclear as well.

I was never satisfied with Promise.race and CompletableFuture.anyOf rationale, especially in coroutines world (as opposed to callbacks world) where cancellation and timeouts can be managed externally and propagated automatically.

Let's see whether there is demand for such extension though

Hi @qwwdfsad
You are right,
I propose here another operator, I cannot found it in the core library and hopefully it will be useful.

This is more generic, so awaitFirst is trivial.
The idea is: having a collection of Jobs, I want to get these while completing.

Here a POC:

fun <E : Job> Iterable<E>.joinChannel(): ReceiveChannel<E> = JoinChannel(this)

private class JoinChannel<E : Job>(jobs: Iterable<E>) : LinkedListChannel<E>() {
    /**
     * Pending jobs count. May be negative in initialization phase
     */
    val _pendingJobCount = atomic(0)
    @Volatile
    var _disposableHandles: List<DisposableHandle>?

    init {
        val disposableHandles =
                jobs.mapNotNull { job ->
                    if (job.isCompleted) {
                        offer(job)
                        null
                    } else {
                        job.invokeOnCompletion { onJobCompletion(job) }
                    }
                }

        if (disposableHandles.isEmpty()) {
            _disposableHandles = null
            close()
        } else {
            _disposableHandles = disposableHandles
            val pending = _pendingJobCount.addAndGet(disposableHandles.size)
            if (pending == 0) close()
        }
    }


    fun onJobCompletion(job: E) {
        try {
            offer(job)
            val pending = _pendingJobCount.decrementAndGet()
            if (pending == 0) {
                _disposableHandles = null
                close()
            }
        } catch (_: Exception) {
            // ignore exception if output channel is closed
        }
    }

    override fun cleanupSendQueueOnCancel() {
        _disposableHandles?.apply {
            _disposableHandles = null
            forEach(DisposableHandle::dispose)
        }
    }
}

Could you please elaborate why does one need this operator?
For what kind of tasks it can be used and why it's impossible to emit item to the channel by the Job itself?

Good catch @qwwdfsad

why it's impossible to emit item to the channel by the Job itself?

Should I do it?
Clearly it is possible, but suspension points in Jobs will not happen in tail position.

I prefer a different approach for this problem, ie using a regular map/reduce.

suspend fun build(vararg parts: Part): Result =
        parts
                .map { async { buildPart(it) } }
                .joinChannel()
                .map { it.await() }
                .reduce { acc, result -> acc + result }

It is possible to implement this function using something like awaitAll, but I have to await all Deferreds before reduce them.
Finally I am confident that joinChannel is pretty versatile (ie with first or find).

I document my "use-case needed" as future reference.

I have to download a page from n hosts (often two or three), the page is the same so I can use equally a random one, possible the fastest to retrieve.
The required library returns a result if it success or throws an error on fail, sometimes it hangs (HTTPS issue).

I shifted the timeout problem outside, so I have to choose only the fastest page.

A solution is:

fun main(args: Array<String>) = runBlocking {
    val job = Job(coroutineContext[Job])
    try {
        val context = coroutineContext + job
        val loaders =
                args
                        .mapAsync(context) { fetch(it) }
                        .toMutableList()

        while (loaders.isNotEmpty()) {
            val data: String? = select {
                for (verifier in loaders) {
                    verifier.onJoin {
                        loaders.remove(verifier)
                        if (verifier.isCompletedExceptionally) null
                        else verifier.getCompleted()
                    }
                }
            }

            if (data != null) println("Found $data")
        }

        println("Data not founds")
    } finally {
        job.cancel()
    }
}

fun <T, R> Array<out T>.mapAsync(context: CoroutineContext, transform: suspend CoroutineScope.(T) -> R): List<Deferred<R>> =
        map { async(context) { transform(it) } }

using joinChannel

fun main(args: Array<String>) = runBlocking {
    val job = Job(coroutineContext[Job])
    try {
        val context = coroutineContext + job
        val loaders = args.mapAsync(context) { fetch(it) }

        val data = loaders
                .joinChannel()
                .filterNotNull()
                .firstOrNull()

        if (data != null) println("Found $data")
        else println("Data not founds")
    } finally {
        job.cancel()
    }
}

If you have a better way to solve this problem, please give me a tip.
Thank you.

How about using select to await for the first loader?

val data = select<Data> {
    loaders.forEach {
        it.onAwait { it }
    }
}

Hi @elizarov
it is the same of

suspend fun <E : Job> Iterable<E>.joinFirst(): E = select {
    for (job in this@joinFirst) {
        job.onJoin { job }
    }
}

suspend fun <E : Deferred<R>, R> Iterable<E>.awaitFirst(): R = joinFirst().getCompleted()

I rethinked my solution and my first proposal is inefficient for my own use case.
I require something like first { } without considering the send order, but I am looking for the first completed job.

So my proposal is something like a joinChannel, when defferred tasks are reordered using the completition time.

I suspect that this issue is related to #172, but I have to work with already created Deffered.

@fvasco So, you are basically looking for the following extension:

fun <T> List<T: Job>.joinChannel(): Channel<T> = produce {
    val remaining = toMutableSet()
    while (!remaining.isEmpty()) {
        select {
            for (job in remaining) job.onJoin { 
                send(it)
                remaining.remove(it)
            }
        }
    }
}

This implementation is O(N^2) but am I getting what you want correctly?

Yes @elizarov, it is correct.

I solved my job in this way, five days ago I wrote a really similar ad hoc implementation, I named remaining list as loaders and I currently use this strategy for only three tasks (and I cancel the pending tasks as soon as possible), so it is ok for me.

Eleven days ago I proposed an untested POC with complexity O(N).

However I consider this issue really linked to #172
To solve that issue we need to parallelize the map/reduce operators, in this issue I proposed mapAsync.
In this issue we need to sort a ReceiveChannel of asynchronous tasks using a select clause, personally I need only onJoin.

My recent question https://github.com/Kotlin/kotlinx.coroutines/issues/172#issuecomment-405851442 is about the link between this issues.
We can solve this task writing someting like:

val urls: List<String> = TODO()
val data = urls
        .asReceiveChannel()
        .mapNotNull(parallelism = urls.size, preserveOrder = false) { fetch(it) }
        .first()

@fvasco Thanks. It is indeed related. I wonder if we should really blow up a list of parameters for all the different operators or, maybe, provide a single extension for that you can write something like:

val urls: List<String> = TODO()
val data = urls
        .asReceiveChannel() // optional? define parallel for lists too? 
        .parallel(urls.size) // order is not preserved by default (and maybe never preserved???) 
        .mapNotNull() { fetch(it) }
        .first()

Good, Dart's Future have the wait any and wait all methods:
https://api.dartlang.org/stable/2.0.0/dart-async/Future-class.html

And Dart's async, yeild, await for are far more _fancy ergonomics_ :
https://www.dartlang.org/guides/language/language-tour#asynchrony-support
@elizarov may I suggest support above in Kotlin ?

for instance, in Dart,

  await for (var request in requestServer) {
    handleRequest(request);
  }

maybe equivlent in Kotlin:

  for (var request.await() in requestServer) {
    handleRequest(request)
  }

Or like in Scala:

  for (var request in requestServer) {
    val x = handleRequest(request);
  } yield x

@SolomonSun2010 We've designed Kotlin coroutines for more ergonomics. In Kotlin these examples look like this:

for (request in requestServer) {
    handleRequest(request)
}

It is cleaner and with less boiler-plate in Kotlin (no need to explicitly write await in Kotlin if you use suspending functions and channels). As an added bonus, it is not hardcoded into the language (unlike Dart), but is implemented in a library, so we can have even more variety of useful constructs without having to release an update to the language. I welcome you to study coroutines guide to learn about other features we offer.

Most / many people treat Kotlin as the Better Java. I appreciate Kotlin 's ergonomics focus spirit.
Hence, some best pratice utilities in popular 3rd library, such as Guava, Eclipse Collections...锛宲erhaps are absorbed in Kotlin.
https://google.github.io/guava/releases/snapshot-jre/api/docs/
There are allAsList() etc. funny utlities.
Thanks for Kotlin extension, it is more feasible. @elizarov
By the way, R language fans will be family kindly with Distpatcher.Android锛孌ispatcher.XXX, because they are family with data.frame, component.name... this dot naming style.

Closing as obsolete

Was this page helpful?
0 / 5 - 0 ratings

Related issues

zach-klippenstein picture zach-klippenstein  路  3Comments

mttmllns picture mttmllns  路  3Comments

mhernand40 picture mhernand40  路  3Comments

ScottPierce picture ScottPierce  路  3Comments

ZakTaccardi picture ZakTaccardi  路  3Comments