joinAll is simple map { it.join() }, while awaitAll needs a slightly more involved implementation than a simple map { it.await() }. It shall actually wait on all of the deferreds from collection and crash as soon as any one of them crashes.
joinAll is simple map { it.join() }
onEach should be considered to avoid collection's copy.
Related: I wrote this extension function which I use all the time:
/**
* Performs [block] concurrently on each item in the iterable (with [async]) then awaits each deferred value.
*
* This function launches the coroutines for each value with the current coroutine context (i.e. the context returned
* from [coroutineContext].
*/
suspend fun <T, R> Iterable<T>.mapAsync(block: suspend (T) -> R): List<R> =
map { async(coroutineContext()) { block(it) } }.map { it.await() }
edit: perhaps a more appropriate name would be mapConcurrent
@luisrayas3 It would be also useful if concurrent/parallel map supported limited parallism. The idea is leave it named map, but add an additional parallelism parameter. This is covered in #172
Let me also add an idea by Brian Parma from public slack to have a something like asCompleted for a list of deferred similar to Python: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed
Some possible implementations:
fun Iterable<Job>.joinAll() : Iterable<Job> = onEach(parallelism = 1) { it.join() }
fun Iterable<Deferred<E>>.awaitAll() : Iterable<E> = flatMap(parallelism = 1)
fun ReceiveChannel<Deferred<E>>.asCompleted() : ReceiveChannel<Deferred<E>>.asCompleted() = onEach(parallelism = MAX) { it.join() }
I made simple implementation for mapAsync method. There are some problems with contexts definition, but it works okay for my cases https://gist.github.com/fogone/a0ffa2432fb8e87f1ea007add3d402aa
This is great!
It is possible though to implement this preserving arity of the different operations and the type information they may return without coallescing their types to a list. It's not pretty how it can be done https://github.com/arrow-kt/arrow/blob/master/modules/core/arrow-syntax/src/main/kotlin/arrow/syntax/applicative/applicative.kt#L256 but Arrow users can already do without casting :
import arrow.effects.*
data class Joined<A, B, C>(a: A, b: B, c: C)
val op1: DeferredK<A> = async { ... }.k()
val op2: DeferredK<B> = async { ... }.k()
val op3: DeferredK<C> = async { ... }.k()
val results: DeferredK<Joined> =
DeferredK.applicative().map(op1, op2, op3, { (a, b, c) ->
Joined(a, b, c)
})
While traversing and sequencing over List<Deferred<A>> where A is fixed is useful, I think it's more useful to preserve type information when joining paralellizable results as often times you launch N parallel ops and each one is returning different typed responses. Not sure if this is within the design space of async / await but as user I sure would like that feature.
Here's an implementation for waiting on lists of Jobs:
https://github.com/venkatperi/kotlin-coroutines-lib/blob/master/src/main/kotlin/com/vperi/kotlinx/coroutines/experimental/JobIterables.ext.kt
@raulraja We try to keep kotlinx.coroutines idiomatic with respect to Kotlin stdlib. We don't plan to have any multiple-arity overloads. Idiomatic way to write your code is:
data class Joined<A, B, C>(a: A, b: B, c: C)
val op1: Deferred<A> = async { ... }
val op2: Deferred<B> = async { ... }
val op3: Deferred<C> = async { ... }
val results = Joined(op1.await(), op2.await(), op3.await())
Most helpful comment
onEachshould be considered to avoid collection's copy.