Kotlinx.coroutines: kotlin.coroutines.channels.awaitClose: JobCancellationException

Created on 15 Jan 2020  路  7Comments  路  Source: Kotlin/kotlinx.coroutines

The awaitClose function currently throws a JobCancellationException if the job was canceled before calling the function.

I would have personally expected that the block passed to awaitClose {聽} gets executed without throwing this exception.

Sample code:

internal fun SensorManager.consumeValuesAsFlow(
    sensor: Sensor,
    measureInterval: Duration = 20L with TimeUnit.MilliSeconds
): Flow<FloatArray> {
    return callbackFlow {
        val sensorEventListener = object : SensorEventListener {
            override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) = Unit
            override fun onSensorChanged(event: SensorEvent) {
                catch { offer(event.values) }
            }
        }

        registerListener(
            sensorEventListener,
            sensor,
            measureInterval[TimeUnit.MicroSeconds].toInt(),
            sensorHandler
        )
        try {
            awaitClose { unregisterListener(sensorEventListener) }
        } catch (t: Throwable) {
            Log.e("SensorManager", "consumeValuesAsFlow", t) // called with JobCancellationException
        }
    }
}

Coroutines version 1.3.3
Kotlin version 1.3.61

docs

Most helpful comment

I wrote an extension function to prevent JobCancellationException in our app. Maybe it will be useful for someone who still looking for a solution. You must replace offer to offerSafe inside of callbackFlow.

inline fun <reified E : Any> ProducerScope<E>.offerSafe(element: E) {
    if (isActive) {
        offer(element)
    }
}

All 7 comments

I would have personally expected that the block passed to awaitClose { } gets executed without throwing this exception.

Could you please elaborate on why? awaitClose is a regular cancellable suspend function and thus it behaves like one.

And additional non-suspending block parameter is here to indicate that
1) A coroutine can be cancelled, so suspension calls in a close sequence may be inappropriate
2) Code in the block is not semantically the same as writing the same code after awaitClose

I think what confused me is the statement in the docs:

"Suspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine."

This made it sound like a good fit for a cleanup function. It waits until the coroutine/channel gets closed/canceled to finally do some last peace of work. It never came to my mind that it would throw an exception if it is already canceled or closed! Intuitively, I thought, it would just continue calling the block (which it does, but throws the exception also)

Maybe my intuition on this function is just different because I used it in that specific context 鈽猴笍

Thanks! I will improve the documentation of awaitClose to be more straightforward, then

The awaitClose method is OK which will not throw JobCancellationException because it's a cancellable suspended function.

But the offer method is not OK especially in multi-thread, so maybe the job was canceled, but it still calls the offer method which will throw JobCancellationException.

And the JobCancellationException is really confused because it will have less stack trace, just kotlinx.coroutines.JobCancellationException: Job was cancelled; job=SupervisorJobImpl{Cancelling}@..., so I hope you can give more information about JobCancellationException if it was thrown when coroutine doesn't ignore the Exception.

@ExperimentalCoroutinesApi
fun main(args: Array<String>) = runBlocking {
  val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
  scope.launch {
    getFlow().collect {
      println(it)
    }
  }
  Thread.sleep(10)
  scope.cancel()
  Thread.sleep(1000)
}

@ExperimentalCoroutinesApi
fun getFlow(): Flow<String> {
  return callbackFlow {
    Thread {
      for (i in 0 until 10) {
        val temp = "flow ==> $i"
        offer(temp)
        Thread.sleep(5)
      }
    }.start()
    Thread.sleep(1000)
  }
  // no awaitClose for better testing
}

This is an example I just test because I got lots of crashes because of the JobCancellationException.

It will not just happen every time, but it's really frequently.

Yes, offer is defined to throw an exception on a closed channel: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
This could be surprising and there've been other requests to fix it.
Please, follow #974

I wrote an extension function to prevent JobCancellationException in our app. Maybe it will be useful for someone who still looking for a solution. You must replace offer to offerSafe inside of callbackFlow.

inline fun <reified E : Any> ProducerScope<E>.offerSafe(element: E) {
    if (isActive) {
        offer(element)
    }
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

ScottPierce picture ScottPierce  路  3Comments

iTanChi picture iTanChi  路  3Comments

mariusstaicu picture mariusstaicu  路  3Comments

mttmllns picture mttmllns  路  3Comments

elizarov picture elizarov  路  3Comments