Hello,
I created two simple tests (in Kotlin but its pretty straightforward to be translated in Java):
@Test
fun longRunningQueueTaskWithThenAccept() {
val key = "long-running-task-queue1"
val blockingQueue = redisson.getBlockingQueue<String>(key)
blockingQueue.offer("task1")
blockingQueue.pollAsync(1, TimeUnit.MINUTES).thenAccept {
(1..100).forEach {
val rSetCache = redisson.getSetCache<Int>("set$it")
rSetCache.add(it)
rSetCache.expire(100, TimeUnit.MILLISECONDS)
val timeToLive = rSetCache.remainTimeToLive()
assertThat(timeToLive, Matchers.greaterThan(0L))
}
}.toCompletableFuture().join()
}
And a second one:
@Test
fun longRunningQueueTaskWithThenCompose() {
val key = "long-running-task-queue2"
val blockingQueue = redisson.getBlockingQueue<String>(key)
blockingQueue.offer("task1")
blockingQueue.pollAsync(1, TimeUnit.MINUTES).thenCompose {
CompletableFuture.supplyAsync {
(1..100).forEach {
val rSetCache = redisson.getSetCache<Int>("set$it")
rSetCache.add(it)
rSetCache.expire(100, TimeUnit.MILLISECONDS)
val timeToLive = rSetCache.remainTimeToLive()
assertThat(timeToLive, Matchers.greaterThan(0L))
}
}
}.toCompletableFuture().join()
}
I had the expectation that these two tests will work more or less in the same way (i.e. I expected that both tests will pass). Surprisingly first test fails after running several times in the loop . It fails with:
Caused by: org.redisson.client.RedisTimeoutException: Command still hasn't been written into connection! Increase nettyThreads and/or retryInterval settings. Payload size in bytes: 0. Node source: NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null], connection: RedisConnection@1398444521 [redisClient=[addr=redis://172.28.0.1:32788], channel=[id: 0x3b4b93f2, L:/172.28.0.1:44394 - R:/172.28.0.1:32788], currentCommand=CommandData [promise=RedissonPromise [promise=ImmediateEventExecutor$ImmediatePromise@2056bc70(success: task1)], command=(BLPOP), params=[long-running-task-queue, 60], codec=org.redisson.codec.MarshallingCodec]], command: (PTTL), params: [set8] after 3 retry attempts
The only difference between these two tests is the async function composition. First test uses thenAccept and second one uses thenCompose, i.e. the difference is in the CompletionStage. Is it expected that first test should fail?
Redis version
6.0.4
Redisson version
3.13.1
Redisson configuration
singleServerConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: null
subscriptionsPerConnection: 5
clientName: null
address: "redis://127.0.0.1:6379"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 24
connectionPoolSize: 64
database: 0
dnsMonitoringInterval: 5000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"
You shouldn't use sync commands in async listeners since they run on netty threads. Try to use thenAcceptAsync(Consumer<? super T> action, Executor executor) method instead.
It makes sense. I wanted to get confirmation about the usage. Maybe it will be a good idea to document the usage of the API.
documentation updated https://github.com/redisson/redisson/wiki/3.-Operations-execution#31-async-way
Most helpful comment
You shouldn't use sync commands in async listeners since they run on netty threads. Try to use
thenAcceptAsync(Consumer<? super T> action, Executor executor)method instead.