Kotlinx.coroutines: Support "worker pool" pattern in actor builder and other related operators

Created on 29 Nov 2017  路  13Comments  路  Source: Kotlin/kotlinx.coroutines

actor builder should natively support "worker pool" pattern via an additional optional parameter ~parallelism~ concurrency that defaults to 1, so that to you if you have a list of of some requests, then can be all processed concurrently with a specified limited concurrency with a simple code like this:

val reqs: List<Request> = ...
val workers = actor(concurrency = n) { 
    for (it in channel) processeRequest(it)
}

This particular pattern seems to be quite common, with requests being stored in either a list of requests of receive from some other channel, so the proposal is to add concurrency to map, and cosumeEach, too, to be able to write something like:

incomingRequests.consumeEach(concurrency = n) { processRequest(it) }

UPDATE: We will consistently call it concurrency here. We can have dozens of concurrent coroutines which run on a single CPU core. We will reserve the name parallelism to denote limits on the number of CPU cores that are used.

enhancement

Most helpful comment

I want to bump this issue.
This pattern is so often, I see questions about implementation at least each week on Kotlin Slack #coroutines channel also all fast ad-hoc implementations often have problems (a similar problem we had before awaitAll extensions, when simple extension functions just use map { it.await() } which leak coroutines in case of error)

All 13 comments

Is this too naive implementation of map?

fun <E, R> ReceiveChannel<E>.map(
    context: CoroutineContext = kotlinx.coroutines.experimental.Unconfined,
    parallelism: Int = 1,
    transform: suspend (E) -> R
): ReceiveChannel<R> = produce(context, capacity = parallelism) {
    (0 until parallelism).map {
        launch(context) {
            consumeEach {
                send(transform(it))
            }
        }
    }.forEach { it.join() }
}

@enleur This is close. However, I'd like to have a slightly more efficient implementation that launches up to n coroutines only as they are needed, so that it starts up efficiently even for very large values of n.

As a proposal for an alternative implementation:

suspend fun <T> forkJoin(
        context: CoroutineContext = DefaultDispatcher,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        outerBlock: (fork: (suspend () -> T) -> Unit) -> Unit
): List<T> {
    val deferreds = ArrayList<Deferred<T>>()
    outerBlock({ deferreds.add(async(context, start) { it() }) })
    return deferreds.map { it.await() }
}

Usage example 1:

val stream = listOf(1, 2, 3).stream()
val results = forkJoin<Int> { fork ->
    stream.forEach { fork { suspendFunc(it) } }
}

Usage example 2:

val results = forkJoin<Int> { fork ->
    for (i in 1..5) {
        if (i % 2 == 0)
            continue

        fork { suspendFunc(i) }
    }
}

The main advantage: this is quite flexible with respect to the outer "looping" code.
You are not limited some strict interface for outgoing data: for example stream only or channel only.
You can use any language features to organize fork loop: for, if, streams and so on.

Also you are not limited exactly one 'request' parameter for processing function, you may use function with any number of parameters.

Does concurrent map preserve the order?

Should we introduce a optional parameter preserveOrder : Boolean = true for some operators? (ie map, filter, ...)

Sometimes you need an order preserved, sometimes you do not. I wonder what should be the default and whether it should be controlled by a boolean of there should be separate operators.

Note, that an alternative design approach to solve the use-case of parallel processing is to introduce a dedicated parallel (?) combinator, so that channel.parallel().map { transform(it) } would perform transform in parallel for all incoming elements without preserving the order.

I am considering the follow signature, this encapsulates the parallel blocks and allows to reuse all current operators.

suspend fun <E, R> ReceiveChannel<E>.parallel(
        parallelism: Int,
        block: suspend ProducerScope<R>.(ReceiveChannel<E>) -> Unit
): ReceiveChannel<R>

or

suspend fun <E, R> ReceiveChannel<E>.parallel(
        parallelism: Int,
        block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R>

I take some time to expose my previous message.

The idea behind is to use a regular fork/join strategy, fork and join using Channels is pretty easy, so it is possible use paralel pipelines to process items.

Multiple coroutines receive items from a single source ReceiveChannel and send results to the output channel.

suspend fun <E, R> ReceiveChannel<E>.pipelines(
        parallelism: Int,
        block: suspend ReceiveChannel<E>.() -> ReceiveChannel<R>
): ReceiveChannel<R>


val ids: ReceiveChannel<Int> = loadIds()
val largeItem = ids
        .pipelines(5) {
            map { loadItem(it) }
                    .filter { it.active }
        }
        .maxBy { it.size }
}

Unfortunately using this syntax is difficult consume data in parallel, ie consumeEach.

So an alternative syntax can be:

suspend fun <E, R> ReceiveChannel<E>.fork(
        parallelism: Int,
        block: suspend (ReceiveChannel<E>) -> R
): List<R>


val largeItem = ids
        .fork(5) {
            it.map { loadItem(it) }
                    .filter { it.active }
                    .maxBy { it.size }
        }
        .filterNotNull()
        .maxBy { it.size }

Obviously consuming items in the fork function produces a List<Unit> and does not requires the join phase.

I suspect that both operators are useful.

I want to bump this issue.
This pattern is so often, I see questions about implementation at least each week on Kotlin Slack #coroutines channel also all fast ad-hoc implementations often have problems (a similar problem we had before awaitAll extensions, when simple extension functions just use map { it.await() } which leak coroutines in case of error)

A potential implementation of consumeEach with the spin up:

suspend inline fun <E> ReceiveChannel<E>.consumeEach(
    maxConcurrency: Int,
    initialConcurrency: Int = 10,
    coroutineContext: CoroutineContext = EmptyCoroutineContext,
    crossinline action: suspend (E) -> Unit
) =
    withContext(coroutineContext) {

        if (maxConcurrency <= 0)
            if (initialConcurrency > maxConcurrency)
                throw IllegalArgumentException("initialConcurrency must be less than or equal to maxConcurrency")
            else if (initialConcurrency < 0)
                throw IllegalArgumentException("Can not have a negative initialConcurrency")


        val busy = AtomicInteger(0)

        val workers = MutableList(min(maxConcurrency, initialConcurrency)) {
            launch {
                while (isActive && !(isClosedForReceive && isEmpty)) {
                    busy.incrementAndGet()
                    action([email protected]())
                    busy.decrementAndGet()
                }
            }
        }

        if (maxConcurrency > initialConcurrency || maxConcurrency <= 0) {
            while (isActive && !(isClosedForReceive && isEmpty) && (workers.size < maxConcurrency || maxConcurrency <= 0)) {
                if (busy.get() == workers.size) {
                    val received = receive()

                    workers += launch {
                        busy.incrementAndGet()
                        action(received)
                        busy.decrementAndGet()

                        while (isActive && !(isClosedForReceive && isEmpty)) {
                            busy.incrementAndGet()
                            action([email protected]())
                            busy.decrementAndGet()
                        }
                    }
                }
                delay(10)
            }
        }

        workers.joinAll()
    }

I really dislike that while loop to check sizes. It may be possible to do some kind of fake-observable on busy and only launch a watcher coroutine when it hits max (and cancel it when it drops down).

Either way, it shouldn't be to terrible as it quits once the spin up is done, and will often be waiting on receive().

I'm also not sure if the joinAll() at the end is necessary, as afaik the couroutineScope should do any clean up, but I'm not sure enough to leave it off.

This pattern is common enough even outside of actors (e.g. make a lot of web requests, but only have 10 going at a time) that it seems like it might be worth having a separate api for launching n amount of coroutines, and use that here, rather than vise versa. At the very least there should be something similar for produce.

Something like:

coroutineScope{
    limitedConcurrency(concurrency = 10){
        (1..100).forEach{
            launch{ doThing() }
        }
    }
}

Only 10 doThings would be executing at any given time.

Where any launches would be redirected to either a worker thread, forced to be lazy and started once there is room, or just have the block held until there is room, then launched.

I agree, coming from RxJava I really wish there was something like flatMap() with maxConcurrency without requiring channels

Was this page helpful?
0 / 5 - 0 ratings