I wrote two custom RxJava style operators until #437 / #254 are addressed. I want to ensure that they are cleaned up correctly, so I leveraged onCompletion = consumesAll(*sources). My 0.x code is below:
/**
* Merges multiple [ReceiveChannel]s of the same type [T] into a single [ReceiveChannel]
*/
fun <T> merge(
vararg sources: ReceiveChannel<T>,
context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<T> =
GlobalScope.produce(context, onCompletion = consumesAll(*sources)) {
sources.forEach { source ->
launch { source.consumeEach { send(it) } }
}
}
/**
* Emit [E] only when it is not equal to the previous emission. The first emission will always be
* emitted. Use this to not emit the same value twice in a raw.
*
* `.equals()` equality comparison will be used.
*
* Equivalent to RxJava's `.distinctUntilChanged()` operator.
*/
fun <E> ReceiveChannel<E>.distinctUntilChanged(
context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<E> = GlobalScope.produce(context, onCompletion = consumes()) {
val last = AtomicReference<E>()
var wasInitialized = false
consumeEach { emission ->
if (!wasInitialized) {
// first emission
last.set(emission)
wasInitialized = true
send(emission)
} else {
// we have a previous emission to compare to
if (emission != last.get()) {
// a distinct value has appeared
last.set(emission)
send(emission)
}
}
}
}
Unfortunately for me, the 1.x APIs for this are now an @InternalCoroutinesApi because of their use of the onCompletion: CompletionHandler. How can I modify these I don't need to rely on the internal onCompletion API?
Would changing it to the following be a safe alternative? If so, could this be pointed out in the documentation?
GlobalScope.produce(context) {
[email protected][Job]!!.invokeOnCompletion(consumesAll(*sources))
// ...
}
Would changing it to the following be a safe alternative?
Yes, this is how it is actually implemented internally. Also, probably this is a good argument for #934
If so, could this be pointed out in the documentation?
Not the part with invokeOnCompletion, but with produce own Job and its lifecycle. Thanks for pointing that out.
This API became internal because we are currently actively prototyping cold-streams.
With cold-streams we expect to all intermediate operators on channels to be no longer needed (because channels were not intended to be used as rx replacement) and then API of produce/actor/and its operators will be revisited again.
Most helpful comment
Yes, this is how it is actually implemented internally. Also, probably this is a good argument for #934
Not the part with
invokeOnCompletion, but withproduceownJoband its lifecycle. Thanks for pointing that out.This API became internal because we are currently actively prototyping cold-streams.
With cold-streams we expect to all intermediate operators on channels to be no longer needed (because channels were not intended to be used as rx replacement) and then API of produce/actor/and its operators will be revisited again.