I'm creating many short-lived requests concurrently using the *AsyncClient, and this fails with:
java.util.concurrent.CompletionException: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:160)
at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:143)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.exceptionOccurred(MakeAsyncHttpRequestStage.java:185)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$exceptionCaught$1(ResponseHandler.java:120)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:132)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.exceptionCaught(ResponseHandler.java:119)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at com.typesafe.netty.HandlerSubscriber.exceptionCaught(HandlerSubscriber.java:157)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:850)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:364)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1273)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1084)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1080)
at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:434)
at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
... 28 more
This can be reproduced by creating a do-nothing lambda function called "test" in your AWS account, and then running this JUnit test a couple of times:
package repro.bug;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvocationType;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
public class CallLambdaLots {
public static final int CONCURRENCY = 1000;
LambdaAsyncClient lambda = LambdaAsyncClient.create();
@Test
@SuppressWarnings("unchecked")
public void joinAllAtOnce() {
CompletableFuture.allOf(
IntStream.range(1, CONCURRENCY)
.mapToObj(this::callLambda)
.toArray(
size ->
(CompletableFuture<InvokeResponse>[])
Array.newInstance(CompletableFuture.class, size)))
.join();
}
@Test
public void joinOneAtATime() {
IntStream.range(1, CONCURRENCY)
.mapToObj(this::callLambda)
.collect(Collectors.toList())
.forEach(CompletableFuture::join);
}
private CompletableFuture<InvokeResponse> callLambda(int payload) {
return lambda.invoke(
InvokeRequest.builder()
.functionName("test")
.invocationType(InvocationType.Event)
.payload(ByteBuffer.wrap(Integer.toString(payload).getBytes()))
.build());
}
}
On machines with lots of cores, you have to crank CONCURRENCY up quite a lot to hit the race condition, but from a small lambda function with 1-2 cores I hit this even with 50-100 concurrent requests.
Thank you for bug report! Can you tell me what version of the SDK you used?
Ah, of course, should have included that. Here's the build.gradle I used:
plugins {
id "io.spring.dependency-management" version "1.0.3.RELEASE"
id "java"
}
group 'repro.bug'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
dependencyManagement {
imports {
mavenBom 'software.amazon.awssdk:bom:2.0.0-preview-4'
}
}
repositories {
mavenCentral()
}
dependencies {
compile 'software.amazon.awssdk:lambda'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
And here's the exact set of dependencies used:
testRuntimeClasspath - Runtime classpath of source set 'test'.
+--- software.amazon.awssdk:lambda: -> 2.0.0-preview-4
| +--- org.slf4j:slf4j-api:1.7.25
| +--- software.amazon.awssdk:core:2.0.0-preview-4
| | +--- org.slf4j:slf4j-api:1.7.25
| | +--- software.amazon.ion:ion-java:1.0.2
| | +--- com.fasterxml.jackson.core:jackson-databind:2.8.8
| | | +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.8
| | | \--- com.fasterxml.jackson.core:jackson-core:2.8.8
| | +--- com.fasterxml.jackson.jr:jackson-jr-objects:2.9.0.pr4
| | | \--- com.fasterxml.jackson.core:jackson-core:2.9.0.pr4 -> 2.8.8
| | +--- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.8.8
| | | \--- com.fasterxml.jackson.core:jackson-core:2.8.8
| | +--- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.8.8
| | | +--- com.fasterxml.jackson.core:jackson-core:2.8.8
| | | \--- com.fasterxml.jackson.core:jackson-databind:2.8.8 (*)
| | +--- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.8.8
| | | +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.8
| | | +--- com.fasterxml.jackson.core:jackson-core:2.8.8
| | | \--- com.fasterxml.jackson.core:jackson-databind:2.8.8 (*)
| | +--- software.amazon.awssdk:http-client-spi:2.0.0-preview-4
| | | +--- software.amazon.awssdk:annotations:2.0.0-preview-4
| | | +--- software.amazon.awssdk:utils:2.0.0-preview-4
| | | | +--- org.slf4j:slf4j-api:1.7.25
| | | | \--- software.amazon.awssdk:annotations:2.0.0-preview-4
| | | \--- org.reactivestreams:reactive-streams:1.0.0.final -> 1.0.0
| | \--- software.amazon.awssdk:utils:2.0.0-preview-4 (*)
| +--- software.amazon.awssdk:apache-client:2.0.0-preview-4
| | +--- software.amazon.awssdk:http-client-spi:2.0.0-preview-4 (*)
| | +--- org.slf4j:slf4j-api:1.7.25
| | +--- software.amazon.awssdk:utils:2.0.0-preview-4 (*)
| | +--- software.amazon.awssdk:annotations:2.0.0-preview-4
| | \--- org.apache.httpcomponents:httpclient:4.5.2
| | +--- org.apache.httpcomponents:httpcore:4.4.4
| | +--- commons-logging:commons-logging:1.2
| | \--- commons-codec:commons-codec:1.9
| +--- software.amazon.awssdk:netty-nio-client:2.0.0-preview-4
| | +--- software.amazon.awssdk:http-client-spi:2.0.0-preview-4 (*)
| | +--- software.amazon.awssdk:utils:2.0.0-preview-4 (*)
| | +--- io.netty:netty-codec-http:4.1.13.Final
| | | \--- io.netty:netty-codec:4.1.13.Final
| | | \--- io.netty:netty-transport:4.1.13.Final
| | | +--- io.netty:netty-buffer:4.1.13.Final
| | | | \--- io.netty:netty-common:4.1.13.Final
| | | \--- io.netty:netty-resolver:4.1.13.Final
| | | \--- io.netty:netty-common:4.1.13.Final
| | +--- io.netty:netty-handler:4.1.13.Final
| | | +--- io.netty:netty-buffer:4.1.13.Final (*)
| | | +--- io.netty:netty-transport:4.1.13.Final (*)
| | | \--- io.netty:netty-codec:4.1.13.Final (*)
| | +--- io.netty:netty-transport-native-epoll:4.1.13.Final
| | | +--- io.netty:netty-common:4.1.13.Final
| | | +--- io.netty:netty-buffer:4.1.13.Final (*)
| | | +--- io.netty:netty-transport-native-unix-common:4.1.13.Final
| | | | +--- io.netty:netty-common:4.1.13.Final
| | | | \--- io.netty:netty-transport:4.1.13.Final (*)
| | | \--- io.netty:netty-transport:4.1.13.Final (*)
| | +--- com.typesafe.netty:netty-reactive-streams-http:2.0.0
| | | +--- com.typesafe.netty:netty-reactive-streams:2.0.0
| | | | +--- io.netty:netty-handler:4.1.13.Final (*)
| | | | \--- org.reactivestreams:reactive-streams:1.0.0
| | | \--- io.netty:netty-codec-http:4.1.13.Final (*)
| | \--- org.slf4j:slf4j-api:1.7.25
| \--- io.burt:jmespath-jackson:0.2.0
| +--- io.burt:jmespath-core:0.2.0
| \--- com.fasterxml.jackson.core:jackson-databind:2.7.0 -> 2.8.8 (*)
\--- junit:junit:4.12
\--- org.hamcrest:hamcrest-core:1.3
Also, I ran the repro code on Mac OS X, but the same thing happens with the epoll driver on Amazon Linux.
Was able to reproduce this in 2.0.0-preview-6. Will continue to investigate.
The issue seems to be that the channels can get left in a bad state before being released back into the pool causing an error with HttpStreamsHandler when a subsequent request is made with the pooled channel.
The problem doesn't seem to manifest if we change ResponseHandler#finalizeRequest to unconditionally closeAndRelease.
Quick update:
So far after more testing and debugging, the problems seems to be unique to Lambda's invoke call __with__ InvocationType.EVENT. It doesn't seem to occur with other event types on invoke. Also, I'm unable to reproduce the the error using another service like DynamoDB in V2, and I'm not seeing any errors in V1 for Lambda.
The cause of the error appears to be that for some requests, after the client has finished flushing the request to Lambda, we immediately see a LastHttpContent message being read from the HTTP codec; no previous Response message. This causes HttpStreamsClientHandler to attempt to remove a non-existent -body-publisher handler.
One thing I've found with the async client is that replacing the HttpClientCodec with a new one each time a channel is leased from the pool fixes issue, so maybe there is some edge case causing the codec to enter a bad state after a response from the server is received and before a new one is written.
Quick addition: I'm also unable to reproduce the behavior in V2 using a mock local server (that always sends the same response).
I seem to be running into this with S3 pretty consistently.
I am using v7 of the JDK. Code below is in scala.
""software.amazon.awssdk" % "s3" % "2.0.0-preview-7","
Here is how i am using S3
s3AsyncClient.putObject(putObjReq, new MyPublisher(config.objSize * 1024))
my producer is below. This works for a single occurrence, but fails when multiple requests are queued rapidly.
// Used to generate an infinite stream if required
class MyPublisher(streamSz: Long) extends AsyncRequestProvider {
def contentLength: Long = streamSz
def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
println("New subscription")
s.onSubscribe(new MySubscription(s, streamSz))
}
object MySubscription {
val SIZE: Int = 1024;
val initBuf: Array[Byte] = (for (i <- 0 to (SIZE - 1)) yield 'b'.toByte).toArray
val s3Buffer: ByteBuffer = ByteBuffer.allocateDirect(SIZE)
.put(initBuf, 0, SIZE)
.asReadOnlyBuffer()
}
class MySubscription(s: Subscriber[_ >: ByteBuffer], sz: Long) extends Subscription {
var currPos: Long = 0
val localBuf = ByteBuffer.allocate(sz.toInt)
//MySubscription.s3Buffer.duplicate()
var complete : AtomicBoolean = new AtomicBoolean(false)
def cancel = {}
def request(b: Long) = if (currPos != sz) {
val c = localBuf.capacity()
val r = if ((sz - currPos) < c) (sz - currPos).toInt else c
//println("limit: %d, currPos: %d, sz: %d, b:%d".format(r,currPos,sz,b))
localBuf.position(0)
localBuf.limit(r)
currPos += r
s.onNext(localBuf)
//println("Updated currPos:%d, b:%d".format(currPos,b))
if (b > 1) request(b - 1)
} else {
if(complete.getAndSet(true) == false) {
s.onComplete()
//println("s is complete")
}
}
}
}
This is my stack trace
[info] 15:58:29.041 ERROR s.a.a.h.n.n.internal.ResponseHandler Exception processing request: software.amazon.awssdk.http.DefaultSdkHttpFullRequest@5fb65596
[info] java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
[info] at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1089)
[info] at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:443)
[info] at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
[info] at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
[info] at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
[info] at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
[info] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
[info] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
[info] at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1379)
[info] at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1158)
[info] at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1193)
[info] at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
[info] at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
[info] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
[info] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
[info] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
[info] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
[info] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
[info] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
[info] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
[info] at java.lang.Thread.run(Thread.java:745)
@vardhanv Thanks for the additional repro case! This will come in handy for debugging.
This issue can be simplified as follows here:
I boiled down the reproducer by @vardhanv even further into https://github.com/raboof/awssdk-async (it doees take a number of tries to hit the problem for me though).
Awesome! 馃帀
Awesome Dongie
I'm still hitting this on 2.3.9 SDK with concurrent uploads to S3 with the Async Client. It happens less when I buffer the requests into smaller batches and block on the batch to complete. Example stack trace:
java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:61)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:50)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:167)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:119)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:104)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:236)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:228)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$writeRequest$6(NettyRequestExecutor.java:182)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:162)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.writeRequest(NettyRequestExecutor.java:167)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequest(NettyRequestExecutor.java:157)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:124)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:94)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool$AcquireListener.operationComplete(BetterFixedChannelPool.java:324)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:94)
at io.netty.channel.pool.SimpleChannelPool.notifyHealthCheck(SimpleChannelPool.java:245)
at io.netty.channel.pool.SimpleChannelPool.doHealthCheck(SimpleChannelPool.java:226)
at io.netty.channel.pool.SimpleChannelPool.acquireHealthyFromPoolOrNew(SimpleChannelPool.java:194)
at io.netty.channel.pool.SimpleChannelPool.acquire(SimpleChannelPool.java:164)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool.acquire0(BetterFixedChannelPool.java:168)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool.acquire(BetterFixedChannelPool.java:137)
at software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool.acquire0(HttpOrHttp2ChannelPool.java:76)
at software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool.lambda$acquire$0(HttpOrHttp2ChannelPool.java:70)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: null
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:118)
... 42 common frames omitted
Caused by: java.lang.IllegalArgumentException: Duplicate handler name: HttpStreamsClientHandler#0-body-subscriber
at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1101)
at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:302)
at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:319)
at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:308)
at com.typesafe.netty.http.HttpStreamsHandler.unbufferedWrite(HttpStreamsHandler.java:287)
at com.typesafe.netty.http.HttpStreamsHandler.flushNext(HttpStreamsHandler.java:334)
at com.typesafe.netty.http.HttpStreamsHandler.write(HttpStreamsHandler.java:227)
at com.typesafe.netty.http.HttpStreamsClientHandler.write(HttpStreamsClientHandler.java:30)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:801)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:837)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1071)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:304)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.writeRequest(NettyRequestExecutor.java:166)
... 27 common frames omitted
Example code using Reactor library:
Flux.fromIterable(siteIndices.entrySet())
.buffer(MAX_CONCURRENT_REQ)
.doOnError(t -> logger.error(t.getMessage(), t))
.subscribe(siteIndicesBatch -> {
// Block on a batch to complete up to a timeout
final int batchSize = siteIndicesBatch.size();
final CountDownLatch latch = new CountDownLatch(batchSize);
Flux.fromIterable(siteIndicesBatch)
.doOnError(t -> logger.error(t.getMessage(), t))
.map(e -> {
final RadarSite radarSite = e.getKey();
final Set<String> fileSet = e.getValue();
// Filter old or invalid feed file names
fileSet.removeIf(f -> !feedIndices.isValid(f, minAllowedTime));
final String indexData = String.join("\n", fileSet);
final AsyncRequestBody reqBody = AsyncRequestBody.fromString(indexData, Charsets.UTF_8);
final String s3Key = feedPaths.indexFilePath(radarSite);
final PutObjectRequest putObjReq = PutObjectRequest.builder()
.bucket(bucket)
.key(s3Key)
.contentEncoding("UTF-8")
.contentType("text/plain")
.build();
logger.debug("Adding index file: bucket [{}], path [{}]", bucket, s3Key);
return s3AsyncClient.putObject(putObjReq, reqBody)
.handleAsync((resp, t) -> {
latch.countDown();
if(t != null) {
logger.error(t.getMessage(), t);
}
return resp;
});
})
.filter(Objects::nonNull)
.subscribe();
try {
if(!latch.await(30L, TimeUnit.SECONDS)) {
logger.warn("Timeout waiting batch of size {} to complete", batchSize);
}
} catch (InterruptedException ignored) {}
});
I am using SDK version 2.9.4 and still have the same problem
That issue still happens in version 2.10.41
I'm getting this issue on 2.13.33
Most helpful comment
I'm getting this issue on 2.13.33