Kotlinx.coroutines: `completionHandler` is now an internal API of the `ReceiveChannel` operators - what's the alternative?

Created on 18 Feb 2019  路  1Comment  路  Source: Kotlin/kotlinx.coroutines

I wrote two custom RxJava style operators until #437 / #254 are addressed. I want to ensure that they are cleaned up correctly, so I leveraged onCompletion = consumesAll(*sources). My 0.x code is below:

/**
 * Merges multiple [ReceiveChannel]s of the same type [T] into a single [ReceiveChannel]
 */
fun <T> merge(
    vararg sources: ReceiveChannel<T>,
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<T> =
    GlobalScope.produce(context, onCompletion = consumesAll(*sources)) {
        sources.forEach { source ->
            launch { source.consumeEach { send(it) } }
        }
    }

/**
 * Emit [E] only when it is not equal to the previous emission. The first emission will always be
 * emitted. Use this to not emit the same value twice in a raw.
 *
 * `.equals()` equality comparison will be used.
 *
 * Equivalent to RxJava's `.distinctUntilChanged()` operator.
 */
fun <E> ReceiveChannel<E>.distinctUntilChanged(
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<E> = GlobalScope.produce(context, onCompletion = consumes()) {
    val last = AtomicReference<E>()
    var wasInitialized = false

    consumeEach { emission ->
        if (!wasInitialized) {
            // first emission
            last.set(emission)
            wasInitialized = true
            send(emission)
        } else {
            // we have a previous emission to compare to
            if (emission != last.get()) {
                // a distinct value has appeared
                last.set(emission)
                send(emission)
            }
        }
    }
}

Unfortunately for me, the 1.x APIs for this are now an @InternalCoroutinesApi because of their use of the onCompletion: CompletionHandler. How can I modify these I don't need to rely on the internal onCompletion API?

Would changing it to the following be a safe alternative? If so, could this be pointed out in the documentation?

GlobalScope.produce(context) {
    [email protected][Job]!!.invokeOnCompletion(consumesAll(*sources))
    // ...
}
docs question

Most helpful comment

Would changing it to the following be a safe alternative?

Yes, this is how it is actually implemented internally. Also, probably this is a good argument for #934

If so, could this be pointed out in the documentation?

Not the part with invokeOnCompletion, but with produce own Job and its lifecycle. Thanks for pointing that out.

This API became internal because we are currently actively prototyping cold-streams.
With cold-streams we expect to all intermediate operators on channels to be no longer needed (because channels were not intended to be used as rx replacement) and then API of produce/actor/and its operators will be revisited again.

>All comments

Would changing it to the following be a safe alternative?

Yes, this is how it is actually implemented internally. Also, probably this is a good argument for #934

If so, could this be pointed out in the documentation?

Not the part with invokeOnCompletion, but with produce own Job and its lifecycle. Thanks for pointing that out.

This API became internal because we are currently actively prototyping cold-streams.
With cold-streams we expect to all intermediate operators on channels to be no longer needed (because channels were not intended to be used as rx replacement) and then API of produce/actor/and its operators will be revisited again.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

elizarov picture elizarov  路  143Comments

Tolriq picture Tolriq  路  50Comments

elizarov picture elizarov  路  45Comments

PaulWoitaschek picture PaulWoitaschek  路  47Comments

SUPERCILEX picture SUPERCILEX  路  40Comments