Redisson: RedisTimeoutException while processing task polled from blocking queue within the same CompletionStage

Created on 10 Jul 2020  路  3Comments  路  Source: redisson/redisson

Hello,

I created two simple tests (in Kotlin but its pretty straightforward to be translated in Java):

@Test
fun longRunningQueueTaskWithThenAccept() {
    val key = "long-running-task-queue1"
    val blockingQueue = redisson.getBlockingQueue<String>(key)
    blockingQueue.offer("task1")

    blockingQueue.pollAsync(1, TimeUnit.MINUTES).thenAccept {
        (1..100).forEach {
            val rSetCache = redisson.getSetCache<Int>("set$it")
            rSetCache.add(it)
            rSetCache.expire(100, TimeUnit.MILLISECONDS)
            val timeToLive = rSetCache.remainTimeToLive()
            assertThat(timeToLive, Matchers.greaterThan(0L))
        }
    }.toCompletableFuture().join()
}

And a second one:

@Test
fun longRunningQueueTaskWithThenCompose() {
    val key = "long-running-task-queue2"
    val blockingQueue = redisson.getBlockingQueue<String>(key)
    blockingQueue.offer("task1")

    blockingQueue.pollAsync(1, TimeUnit.MINUTES).thenCompose {
        CompletableFuture.supplyAsync {
            (1..100).forEach {
                val rSetCache = redisson.getSetCache<Int>("set$it")
                rSetCache.add(it)
                rSetCache.expire(100, TimeUnit.MILLISECONDS)
                val timeToLive = rSetCache.remainTimeToLive()
                assertThat(timeToLive, Matchers.greaterThan(0L))
            }
        }
    }.toCompletableFuture().join()
}

I had the expectation that these two tests will work more or less in the same way (i.e. I expected that both tests will pass). Surprisingly first test fails after running several times in the loop . It fails with:

Caused by: org.redisson.client.RedisTimeoutException: Command still hasn't been written into connection! Increase nettyThreads and/or retryInterval settings. Payload size in bytes: 0. Node source: NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null], connection: RedisConnection@1398444521 [redisClient=[addr=redis://172.28.0.1:32788], channel=[id: 0x3b4b93f2, L:/172.28.0.1:44394 - R:/172.28.0.1:32788], currentCommand=CommandData [promise=RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2056bc70(success: task1)], command=(BLPOP), params=[long-running-task-queue, 60], codec=org.redisson.codec.MarshallingCodec]], command: (PTTL), params: [set8] after 3 retry attempts

The only difference between these two tests is the async function composition. First test uses thenAccept and second one uses thenCompose, i.e. the difference is in the CompletionStage. Is it expected that first test should fail?

Redis version
6.0.4

Redisson version
3.13.1

Redisson configuration

singleServerConfig:
  idleConnectionTimeout: 10000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 24
  connectionPoolSize: 64
  database: 0
  dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"

Most helpful comment

You shouldn't use sync commands in async listeners since they run on netty threads. Try to use thenAcceptAsync(Consumer<? super T> action, Executor executor) method instead.

All 3 comments

You shouldn't use sync commands in async listeners since they run on netty threads. Try to use thenAcceptAsync(Consumer<? super T> action, Executor executor) method instead.

It makes sense. I wanted to get confirmation about the usage. Maybe it will be a good idea to document the usage of the API.

Was this page helpful?
0 / 5 - 0 ratings