Kotlinx.coroutines: Go-like WaitGroup to wait multiple coroutines to finish

Created on 9 May 2017  路  5Comments  路  Source: Kotlin/kotlinx.coroutines

Hi,
This is more a question than an issue, however - are there any plans to have WaitGroup-like mechanism? Go documentation for WaitGroup is here.

The problem it is supposed to solve is how to wait bunch of coroutines to complete? Here is a nice summary. Currently it is achievable via channels (sort of done channel) or just by doing jobs.forEach { it.join() }. However the latter requires extra list / array allocation whilst WaitGroup is effectively an atomic counter wrapper with some wait logic in it (see examples). Although it can be implemented quickly it still might be useful as a part of kotlinx.coroutines.

Most helpful comment

I do not immediately see how WaitGroup makes the code cleaner. Compare an example with WaitGroup to an example with a list of jobs. This is a line-by-line translation:

fun test() = publish(CommonPool) {
    val output = Channel<String>()
    val jobs = mutableListOf<Job>() // replaces waitGroup

    // start coroutine per thing
    listOfThings.consumeEach {
        jobs += launch(context) { // replaces waitGroup.add(), no need for waitGroup.done() 
            doSomething(it, output)
        }
    }

    launch(context) {
        for (t in output) {
            send(t)
        }
    }

    // wait for all the coroutines to finish
   jobs.forEach { it.await() } // replaces waitGroup.wait() 
}

It looks cleaner and less error-prone without WaitGroup. In particular, the example with a WaitGroup has a subtle bug that would reproduce only rarely (heisenbug). Under certain load, the first waitGroup.add() could be immediately followed by an invocation waitGroup.done(), thus triggering WaitGroup's notification prematurely. There is no risk of making this mistake with a list of jobs.

Please, keep in mind that in Go there is nothing like a Job, so there is simply no other way to wait for a group of coroutines to finish. Go has to have something like WaitGroup with all its error-proneness. But we don't have to.

Also note, that in kotlinx.corotoines the Job is already created with each coroutine you launch. Allocating an array to keep their list is a relatively minor constant-fraction overhead to that.

All 5 comments

What about select?

Can we make it work like that?

select could be an option, however I think the question here is not how to wait (select / channel.receive()) but what to wait. Consider this example:

val listOfThings = Flux.fromArray(arrayOf("one", "two"))
suspend fun doSomething(thing: String, output: Channel<String>) {
    delay(1000)
    output.send(thing)
}
fun test() = publish(CommonPool) {
    val output = Channel<String>()

    // start coroutine per thing (1)
    listOfThings.consumeEach {
        launch(context) {
            doSomething(it, output)
        }
    }

    launch(context) {
        for (t in output) {
            send(t)
        }
    }

    // wait for all the coroutines to finish? (2)
}

How to wait all the coroutines created at (1) to finish, considering we don't know how many things are in the listOfThings? I see couple of options here.

  1. Use var jobs = arrayListOf<Job>(), in consumeEach append job to list, then at (2) just do job.forEach{ it.join() }. Nice option, however extra array allocation for no need.
  2. Use counter + done channel.
fun test() = publish(CommonPool) {
    val output = Channel<String>()
    val done = Channel<Unit>()
    val totalJobs = AtomicInteger()

    // start coroutine per thing
    listOfThing.consumeEach {
        totalJobs.incrementAndGet()
        launch(context) {
            doSomething(it, output)
            done.send(Unit)
        }
    }

    launch(context) {
        for (t in output) {
            send(t)
        }
    }

    // wait for all the coroutines to finish
    repeat(totalJobs.get()) {
        done.receive()
    }
}

Works, but a little bit verbose.

And this is what WaitGroup can do for you:

fun test() = publish(CommonPool) {
    val output = Channel<String>()
    val waitGroup = WaitGroup()

    // start coroutine per thing
    listOfThings.consumeEach {
        waitGroup.add()
        launch(context) {
            doSomething(it, output)
            waitGroup.done()
        }
    }

    launch(context) {
        for (t in output) {
            send(t)
        }
    }

    // wait for all the coroutines to finish
    waitGroup.wait()
}

With WaitGroup implemented like:

class WaitGroup(initial: Int = 0) {
    private val counter = AtomicInteger(initial)
    private val waiter = Channel<Unit>()

    suspend fun add(delta: Int = 1) {
        val counterValue = counter.addAndGet(delta)
        when {
            counterValue == 0 -> waiter.send(Unit)
            counterValue < 0 -> throw IllegalStateException();
        }
    }

    suspend fun done() = add(-1)

    suspend fun wait() = waiter.receive()
}

I do not immediately see how WaitGroup makes the code cleaner. Compare an example with WaitGroup to an example with a list of jobs. This is a line-by-line translation:

fun test() = publish(CommonPool) {
    val output = Channel<String>()
    val jobs = mutableListOf<Job>() // replaces waitGroup

    // start coroutine per thing
    listOfThings.consumeEach {
        jobs += launch(context) { // replaces waitGroup.add(), no need for waitGroup.done() 
            doSomething(it, output)
        }
    }

    launch(context) {
        for (t in output) {
            send(t)
        }
    }

    // wait for all the coroutines to finish
   jobs.forEach { it.await() } // replaces waitGroup.wait() 
}

It looks cleaner and less error-prone without WaitGroup. In particular, the example with a WaitGroup has a subtle bug that would reproduce only rarely (heisenbug). Under certain load, the first waitGroup.add() could be immediately followed by an invocation waitGroup.done(), thus triggering WaitGroup's notification prematurely. There is no risk of making this mistake with a list of jobs.

Please, keep in mind that in Go there is nothing like a Job, so there is simply no other way to wait for a group of coroutines to finish. Go has to have something like WaitGroup with all its error-proneness. But we don't have to.

Also note, that in kotlinx.corotoines the Job is already created with each coroutine you launch. Allocating an array to keep their list is a relatively minor constant-fraction overhead to that.

Thanks for clarification, it makes sense. Good point about not having Job class in Go!

Was this page helpful?
0 / 5 - 0 ratings