I'm trying to convert an Android CountDownTimer into emitting values via callbackFlow but it throws ClosedSendChannelException: Channel was closed when collecting the items. I can't really find a valid reason why this happens.
@ExperimentalCoroutinesApi
class TimerFlow private constructor(millisInFuture: Long, countDownInterval: Long) {
private val tick: Flow<Long> = callbackFlow {
object : CountDownTimer(millisInFuture, countDownInterval) {
override fun onFinish() {
offer(0L)
}
override fun onTick(millisUntilFinished: Long) {
Log.i("TimerFlow","$millisUntilFinished")
offer(millisUntilFinished)
}
}.start()
}
companion object {
/**
* Create a [Flow] that will be a countdown until a specified time in the future.
*
* @param millisInFuture The milliseconds in the future that this will countdown to.
* @param countDownInterval The minimum amount of time between emissions.
*/
@JvmStatic
fun create(millisInFuture: Long, countDownInterval: Long) =
TimerFlow(millisInFuture, countDownInterval).tick
}
}
Is this a bug or am I missing something? I'm new to Coroutines.
Reason is that callbackFlow block closes the (hidden under the hood) channel, as soon, as everything within
callbackFlow {
...
}
brackets gets executed. So you start your timer and callbackFlow thinks you are done, so it closes the channel, and your timer cannot offer it any values.
In order to keep your callbackFlow alive, while the Timer is ticking, you need to suspend it. Preferably like this:
callbackFlow {
//register listener / timer
awaitClose { //unregister your listener, which is calling 'offer' }
}
The awaitClose {} will keep callbackFlow active, until it gets cancelled manually (either via your listener or by the consumer of the flow).
I agree, that it is not very intuitive, especially for former RxJava users, where Observable.create behaves in opposite way - it will stay alive by default, but you need to complete it manually.
Perhaps having callbackFlow behave, like Observable.create would be a good thing (and channelFlow staying with the current implementation). I do not know.
PS: edited wrong function name
The above answer is generally correct.
This behaviour is described in the documentation and proposes a way to resolve it:
Use awaitClose as the last statement to keep it running. awaitClose argument is called when either flow consumer cancels flow collection or when callback-based API invokes SendChannel.close manually.
This is done intentionally to indicate that not only consumer can complete the flow, but it also can be cancelled (and then callback should be unregistered).
What we can do better here is to add a clear exception message if callbackFlow is closed without by its block
Maybe callbackFlow should accept two lambdas as it parameters:
And it would be 'always active' by default.
Again, I'm not sure. Just food for thought.
Thanks for the input. We were considering a similar API initially but found that it has a way too many limitations.
Consider the following example from the documentation:
fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
val callback = object : Callback {
override fun onNextValue(value: T) {
offer(value)
}
... other methods ...
}
api.register(callback)
// Suspend until either onCompleted or external cancellation are invoked
awaitClose { api.unregister(callback) }
}
To properly close this API, you should have an instance of the callback in your hands. So fun <T> callbackFlow(onClose: () -> Unit, onStart: (ProducerScope<T>.() -> Unit)) is not going to work.
It can be changed to fun <T, C> callbackFlow(onClose: (C) -> Unit, onStart: (ProducerScope<T>.() -> C) though.
But what if both api and callback are created within a onStart? What if onStart and onClose share some common local variables, e.g. diagnostic info? API immediately becomes bloated and unreadable, so awaitClose was the lesser evil
Thanks for the heads up guys. It makes sense now. Coming from RxJava I would expect a behavior similar to observable where you manually have to call onComplete() to terminate it.
Now that the rationale was given I'm not sure if it is a good idea to make channelFlow/callbackFlow stay alive by default or if it should even be intended as an observable alternative. You guys decide that.
What we can do better here is to add a clear exception message if callbackFlow is closed without by its block
Yes that would be a good idea I think.
So I modified a little bit my code to look like this:
@ExperimentalCoroutinesApi
class TimerFlow private constructor(millisInFuture: Long, countDownInterval: Long) {
private val tick: Flow<Long> = callbackFlow {
if (Looper.myLooper() == null) {
throw IllegalStateException("Can't create TimerFlow inside thread that has not called Looper.prepare() Just use Dispatchers.Main")
}
object : CountDownTimer(millisInFuture, countDownInterval) {
override fun onFinish() {
cancel()
}
override fun onTick(millisUntilFinished: Long) {
offer(millisUntilFinished)
}
}.start()
awaitClose()
}
companion object {
/**
* Create a [Flow] that will be a countdown until a specified time in the future.
*
* @param millisInFuture The milliseconds in the future that this will countdown to.
* @param countDownInterval The minimum amount of time between emissions.
*/
@JvmStatic
fun create(millisInFuture: Long, countDownInterval: Long) =
TimerFlow(millisInFuture, countDownInterval).tick
}
}
Afterwards I call the above class like this:
class MainActivity : AppCompatActivity() {
@ExperimentalCoroutinesApi
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btnStart.setOnClickListener {
CoroutineScope(Dispatchers.Main).launch {
setCountDown(5000, 1000)
}
}
}
@ExperimentalCoroutinesApi
private suspend fun setCountDown(millisInFuture: Long, countDownInterval: Long) {
TimerFlow.create(millisInFuture, countDownInterval).collect {
Log.i("main", it.toString())
textView.text = it.toString()
}
}
}
Most helpful comment
Thanks for the input. We were considering a similar API initially but found that it has a way too many limitations.
Consider the following example from the documentation:
To properly close this API, you should have an instance of the
callbackin your hands. Sofun <T> callbackFlow(onClose: () -> Unit, onStart: (ProducerScope<T>.() -> Unit))is not going to work.It can be changed to
fun <T, C> callbackFlow(onClose: (C) -> Unit, onStart: (ProducerScope<T>.() -> C)though.But what if both
apiandcallbackare created within aonStart? What ifonStartandonCloseshare some common local variables, e.g. diagnostic info? API immediately becomes bloated and unreadable, soawaitClosewas the lesser evil