Kotlinx.coroutines: Cancel current Job on RejectedExecutionException?

Created on 24 Oct 2020  路  3Comments  路  Source: Kotlin/kotlinx.coroutines

Since 1.4.0-M1 and #2012, a job is cancelled on RejectedExecutionException.
This lead to a breaking change in my library, but I understand the reasoning behind. Unless I'm missing something, I looks to me that the cancellation of rejected jobs has side effects which I cannot explain.

I created the snippet below to replicate the issue. I'm creating a ThreadPoolExecutor, and I'm making a Dispatcher out of it with asCoroutineDispatcher(). Then, I submit 10 coroutines using that dispatcher, inside a supervisorScope. On my machine, 7 out of 10 coroutines are rejected. However, I looks like the supervisorScope waits forever.

import kotlinx.coroutines.*
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

val ceh = CoroutineExceptionHandler { _, t ->
    t.printStackTrace()
}

fun main() = runBlocking(ceh) {
    val rejectCnt = AtomicInteger(0)
    val pool = ThreadPoolExecutor(0, 4, 0L, TimeUnit.SECONDS, SynchronousQueue()) { r, e ->
        println("rejected")
        rejectCnt.incrementAndGet()
    }
    val dispatcher = pool.asCoroutineDispatcher()

    supervisorScope {
        (0..10).map {
            launch(dispatcher) {
                delay(100)
            }
        }
    }

    println("Why don't we reach that line?")
    Unit
}

The output on my machine:

rejected
rejected
rejected
rejected
rejected
rejected
rejected

For the record, I solved my issue by changing the SynchronousQueue for a LinkedBlockingQueue, so I don't have rejections. But I'd like to understand what happens here.

question

Most helpful comment

I finally found what broke in my code and why it was working with 1.3.9. So I'm reporting back here just in case it's useful for someone else.
In 1.3.9, when creating a dispatcher using an ExecutorService with a SynchronousQueue and asCoroutineDispatcher(), we can have runnable rejection under some circumstances (no free thread at submission). In this case, the corresponding context from which the runnable is submitted isn't cancelled - just re-dispatched on the DefaultExecutor (which basically uses one dedicated thread). Therefore, in case of rejection, runnables were silently re-dispatched on a background thread. That worked fine for me, but it was pure luck.

In 1.4.0-M1, rejected runnables are re-dispatched on Dispatchers.IO and the corresponding context is cancelled. If it's acceptable to queue runnables to avoid rejection, we can solve this problem with using a LinkedBlockingQueue for the ExecutorService. However, this is just one example of how to solve this.

All 3 comments

This is because your ThreadPoolExecutor never throws RejectedExecutionException. Your custom RejectedExecutionHandler replaces default one, but it neither throws RejectedExecutionException nor re-dispatches the given runnable.
In order to fix this test, just add throw RejectedExecutionException() right after rejectCnt.incrementAndGet()

Hmm.. Thanks, I understand now that rejected jobs need to be cancelled (or re-dispatched).
That's a bit frightening because in 1.3.9 there were no cancellation on RejectedExecutionException. So I might had rejected jobs that was stalled and I wasn't aware of that.

I finally found what broke in my code and why it was working with 1.3.9. So I'm reporting back here just in case it's useful for someone else.
In 1.3.9, when creating a dispatcher using an ExecutorService with a SynchronousQueue and asCoroutineDispatcher(), we can have runnable rejection under some circumstances (no free thread at submission). In this case, the corresponding context from which the runnable is submitted isn't cancelled - just re-dispatched on the DefaultExecutor (which basically uses one dedicated thread). Therefore, in case of rejection, runnables were silently re-dispatched on a background thread. That worked fine for me, but it was pure luck.

In 1.4.0-M1, rejected runnables are re-dispatched on Dispatchers.IO and the corresponding context is cancelled. If it's acceptable to queue runnables to avoid rejection, we can solve this problem with using a LinkedBlockingQueue for the ExecutorService. However, this is just one example of how to solve this.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

serebit picture serebit  路  37Comments

elizarov picture elizarov  路  116Comments

SUPERCILEX picture SUPERCILEX  路  40Comments

altavir picture altavir  路  44Comments

elizarov picture elizarov  路  45Comments