Kotlinx.coroutines: Introduce awaitAll and joinAll extensions for collections for Deferreds/Jobs

Created on 29 Nov 2017  路  9Comments  路  Source: Kotlin/kotlinx.coroutines

joinAll is simple map { it.join() }, while awaitAll needs a slightly more involved implementation than a simple map { it.await() }. It shall actually wait on all of the deferreds from collection and crash as soon as any one of them crashes.

enhancement

Most helpful comment

joinAll is simple map { it.join() }

onEach should be considered to avoid collection's copy.

All 9 comments

joinAll is simple map { it.join() }

onEach should be considered to avoid collection's copy.

Related: I wrote this extension function which I use all the time:

/**
 * Performs [block] concurrently on each item in the iterable (with [async]) then awaits each deferred value.
 *
 * This function launches the coroutines for each value with the current coroutine context (i.e. the context returned
 * from [coroutineContext].
 */
suspend fun <T, R> Iterable<T>.mapAsync(block: suspend (T) -> R): List<R> =
    map { async(coroutineContext()) { block(it) } }.map { it.await() }

edit: perhaps a more appropriate name would be mapConcurrent

@luisrayas3 It would be also useful if concurrent/parallel map supported limited parallism. The idea is leave it named map, but add an additional parallelism parameter. This is covered in #172

Let me also add an idea by Brian Parma from public slack to have a something like asCompleted for a list of deferred similar to Python: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed

Some possible implementations:

fun Iterable<Job>.joinAll() : Iterable<Job> = onEach(parallelism = 1) { it.join() }

fun Iterable<Deferred<E>>.awaitAll() : Iterable<E> = flatMap(parallelism = 1)

fun ReceiveChannel<Deferred<E>>.asCompleted() : ReceiveChannel<Deferred<E>>.asCompleted() = onEach(parallelism = MAX) { it.join() }

I made simple implementation for mapAsync method. There are some problems with contexts definition, but it works okay for my cases https://gist.github.com/fogone/a0ffa2432fb8e87f1ea007add3d402aa

This is great!

It is possible though to implement this preserving arity of the different operations and the type information they may return without coallescing their types to a list. It's not pretty how it can be done https://github.com/arrow-kt/arrow/blob/master/modules/core/arrow-syntax/src/main/kotlin/arrow/syntax/applicative/applicative.kt#L256 but Arrow users can already do without casting :

import arrow.effects.*

data class Joined<A, B, C>(a: A, b: B, c: C)

val op1: DeferredK<A> = async { ... }.k()
val op2: DeferredK<B> = async { ... }.k()
val op3: DeferredK<C> = async { ... }.k()

val results: DeferredK<Joined> = 
  DeferredK.applicative().map(op1, op2, op3, { (a, b, c) ->
      Joined(a, b, c)
  })

While traversing and sequencing over List<Deferred<A>> where A is fixed is useful, I think it's more useful to preserve type information when joining paralellizable results as often times you launch N parallel ops and each one is returning different typed responses. Not sure if this is within the design space of async / await but as user I sure would like that feature.

@raulraja We try to keep kotlinx.coroutines idiomatic with respect to Kotlin stdlib. We don't plan to have any multiple-arity overloads. Idiomatic way to write your code is:

data class Joined<A, B, C>(a: A, b: B, c: C)

val op1: Deferred<A> = async { ... }
val op2: Deferred<B> = async { ... }
val op3: Deferred<C> = async { ... }

val results = Joined(op1.await(), op2.await(), op3.await())
Was this page helpful?
0 / 5 - 0 ratings