Describe the bug
I have a service method which SSE clients can use to subscribe to a stream. It returns a Multi. I have a second method which can be used to emit data. The SSE clients receive the data. Everything works fine, and as expected, except that after a client has disconnected, a warning is logged that it Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException.
Expected behavior
No warning should be logged, as nothing abnormal has actually happened.
Actual behavior
A warning is logged, as shown above.
To Reproduce
Steps to reproduce the behavior:
/**
* allows a caller to subscribe to changes of a certain type of object
*/
@GET
@Path("/changes/{subscriberId}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<AnObject> changes(@PathParam("subscriberId") String subscriberId, @QueryParam("type") List<String> type) {
SubscriberModel subscriberModel = SUBSCRIBERS.computeIfAbsent(subscriberId, k -> new SubscriberModel());
subscriberModel.setId(subscriberId);
return Multi.createFrom()
.emitter(e -> {
subscriberModel.setEmitter(e);
e.onTermination(() -> {
logger.info("Removing subscriber " + subscriberId);
e.complete();
SUBSCRIBERS.remove(subscriberId);
// even though the above works nicely, there is an exception logged by quarkus, afterwards.
// see https://stackoverflow.com/questions/61694510/how-to-handle-a-closedchannelexception-on-a-reactive-streams-http-connection-clo
//
});
}, BackPressureStrategy.ERROR);
}
/**
* test method in order to emit an object to ALL subscribers
*/
@GET
@Path("/emit")
@Produces(MediaType.APPLICATION_JSON)
public Response emit() {
logger.info("emitting to " + SUBSCRIBERS.size() + " subscribers");
SUBSCRIBERS.values().forEach(sm ->
sm.emit(new AnObject(UUID.randomUUID(), "anObject"))
);
return Response.noContent().build();
}
subscribe using curl: curl -v -X GET localhost:8086/objects/changes/aClient
hit ctrl+c to close that client. Nothing happens yet in Quarkus, but that is OK, because it is quite standard to use a heart beat to deal with that. This is also required in some SSE servlet implementations.
emit an event using curl: curl -v -X GET localhost:8086/objects/emit
the subscriber is correctly removed from the model
the unexpected warning is logged
Configuration
# the following line is not really used, as the native libraries are not depended upon in the pom.
quarkus.vertx.prefer-native-transport=true
quarkus.http.port=8086
quarkus.log.file.enable=true
quarkus.log.file.level=INFO
quarkus.log.file.format=%d{HH:mm:ss} %-5p [%c{2.}] (%t) %s%e%n
quarkus.log.category."ch.maxant.kdc.objects".level=DEBUG
Screenshots
2020-05-10 14:26:12,366 INFO [ch.max.kdc.obj.ObjectResource] (vert.x-eventloop-thread-6) Removing subscriber aClient
2020-05-10 14:30:59,200 WARN [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@22021ebc(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Environment (please complete the following information):
uname -a or ver:Output of java -version:
openjdk 11.0.7 2020-04-14
OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu219.10)
OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu219.10, mixed mode, sharing)
GraalVM version (if different from Java):
Quarkus version or git rev:
1.4.2.Final
Build tool (ie. output of mvnw --version or gradlew --version):
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /snap/intellij-idea-community/226/plugins/maven/lib/maven3
Java version: 11.0.7, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.3.0-51-generic", arch: "amd64", family: "unix"
Additional context
(Add any other context about the problem here.)
a working implementation of this can be found here: https://github.com/maxant/kafka-data-consistency/blob/master/objects/src/main/java/ch/maxant/kdc/objects/ObjectResource.java
Hi I'm also getting too many similar crashes. Is there any workaround for that?
This is my crash traces https://sentry.io/share/issue/2ed01930df494218aca52e49cb1cf237/
@evanchooly can you please check this
@itboy87 i don't know why you tagged me specifically. this is not my area of expertise.
Sorry i thought may be you are dealing with issues on github can you please
tag relevant developer?
On Tue, 2 Jun 2020 at 9:05 AM, Justin Lee notifications@github.com wrote:
@itboy87 https://github.com/itboy87 i don't know why you tagged me
specifically. this is not my area of expertise.—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/quarkusio/quarkus/issues/9194#issuecomment-637258337,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/ACUSZDGVJCNFPEAKN6NSM63RUR27DANCNFSM4M5HMRNA
.
I have the same issue
@GET
@Path("event")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.TEXT_PLAIN)
public Multi<String> listen() {
return vertx.eventBus().localConsumer(eventChannel).toMulti().map(objectMessage -> objectMessage.body().toString());
}
@FroMage is this with you?
Well, perhaps, but I don't quite understand this issue. Perhaps @vietj can help me make sense of it?
I think that exception occur only when client close connection before request complete. So i don't think its bug but warning. But better if logs more clear like "connection closed by client"
OK I can reproduce. This doesn't happen on Undertow.
@patriot1burke do you remember why you added the VertxBufferImpl class? Using Buffer.buffer(ByteBuf) makes this issue go away and we have one less class to maintain. I'm sure you must have had a reason to not use it and write your own, do you remember?
@FroMage If you look at BufferImpl.getByteBuf() it does a copy of the underlying netty ByteBuf. VertxBufferImpl does not.
There's a number of places in the quarkus code which access the underlying ByteBuf
FYI, undertow has the same VertxBufferImpl implementation and I derived resteasy's implementation from that.
Ah OK. Except apparently there's a bug somewhere wrt buffer deallocation, which leads to those warnings. How can we go about fixing that, if we can't take Vert.x's impl which doesn't have this bug?
I don't know enough about vert.x or netty to give you a good answer.
It looks to me like resteasy over raw vertx is not doing reference counting correctly and not really anything to do with the fact that VertxBufferImpl exists as a class.
It's possible that both are related, in that vert.x will properly reference count its own buffer impl, and not ours?
I do see in VertxBlockingOutput that I am releasing the ByteBuf on error. This "bug" shows up when connection is closed in the middle of a request? If so, the fact that VertxBlockingOutput releases the ByteBuf on errors might be the issue.
I could test this, but I don't know if we should release it or not.
I don't think I implemented the ByteBuf release code here (@johnaohara did):
and defininately not here (that's your code stef):
This may be the error as if you look at the "bug" report error stack track its trying to dereference a buffer that is already at 0.
My commit, but code from @stuartwdouglas ;)
I have the same issue and added quarkus-undertow dependency for workaround
I have the same issue and added quarkus-undertow dependency for workaround
I have no clue why, but adding this dependency solved my problem too.
As @FroMage said, it looks like there's a buffer deallocation bug that causing these warnings and undertow does not make this in the implementation it uses under the hood.
I had the same issue. Adding quarkus-undertow dependency worked for me too. Thanks!
Most helpful comment
OK I can reproduce. This doesn't happen on Undertow.