Quarkus: Exception logged by quarkus after SSE client is closed - "Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException"

Created on 10 May 2020  Â·  24Comments  Â·  Source: quarkusio/quarkus

Describe the bug
I have a service method which SSE clients can use to subscribe to a stream. It returns a Multi. I have a second method which can be used to emit data. The SSE clients receive the data. Everything works fine, and as expected, except that after a client has disconnected, a warning is logged that it Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException.

Expected behavior
No warning should be logged, as nothing abnormal has actually happened.

Actual behavior
A warning is logged, as shown above.

To Reproduce
Steps to reproduce the behavior:

  1. Create a method which clients can used to subscribe to SSEs:
    /**
     * allows a caller to subscribe to changes of a certain type of object
     */
    @GET
    @Path("/changes/{subscriberId}")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Multi<AnObject> changes(@PathParam("subscriberId") String subscriberId, @QueryParam("type") List<String> type) {
        SubscriberModel subscriberModel = SUBSCRIBERS.computeIfAbsent(subscriberId, k -> new SubscriberModel());
        subscriberModel.setId(subscriberId);
        return Multi.createFrom()
                .emitter(e -> {
                    subscriberModel.setEmitter(e);
                    e.onTermination(() -> {
                        logger.info("Removing subscriber " + subscriberId);
                        e.complete();
                        SUBSCRIBERS.remove(subscriberId);

                        // even though the above works nicely, there is an exception logged by quarkus, afterwards.
                        // see https://stackoverflow.com/questions/61694510/how-to-handle-a-closedchannelexception-on-a-reactive-streams-http-connection-clo
                        //
                    });
                }, BackPressureStrategy.ERROR);
    }
  1. Create a method used to emit data to all subscribers:
    /**
     * test method in order to emit an object to ALL subscribers
     */
    @GET
    @Path("/emit")
    @Produces(MediaType.APPLICATION_JSON)
    public Response emit() {
        logger.info("emitting to " + SUBSCRIBERS.size() + " subscribers");
        SUBSCRIBERS.values().forEach(sm ->
                sm.emit(new AnObject(UUID.randomUUID(), "anObject"))
        );
        return Response.noContent().build();
    }
  1. subscribe using curl: curl -v -X GET localhost:8086/objects/changes/aClient

  2. hit ctrl+c to close that client. Nothing happens yet in Quarkus, but that is OK, because it is quite standard to use a heart beat to deal with that. This is also required in some SSE servlet implementations.

  3. emit an event using curl: curl -v -X GET localhost:8086/objects/emit

  4. the subscriber is correctly removed from the model

  5. the unexpected warning is logged

Configuration

# the following line is not really used, as the native libraries are not depended upon in the pom.
quarkus.vertx.prefer-native-transport=true
quarkus.http.port=8086

quarkus.log.file.enable=true
quarkus.log.file.level=INFO
quarkus.log.file.format=%d{HH:mm:ss} %-5p [%c{2.}] (%t) %s%e%n
quarkus.log.category."ch.maxant.kdc.objects".level=DEBUG

Screenshots

2020-05-10 14:26:12,366 INFO  [ch.max.kdc.obj.ObjectResource] (vert.x-eventloop-thread-6) Removing subscriber aClient
2020-05-10 14:30:59,200 WARN  [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@22021ebc(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
    at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
    at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
    at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

Environment (please complete the following information):

  • Output of uname -a or ver:
    Linux hades-ubuntu 5.3.0-51-generic #44-Ubuntu SMP Wed Apr 22 21:09:44 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
  • Output of java -version:
    openjdk 11.0.7 2020-04-14
    OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu219.10)
    OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu219.10, mixed mode, sharing)

  • GraalVM version (if different from Java):

  • Quarkus version or git rev:
    1.4.2.Final

  • Build tool (ie. output of mvnw --version or gradlew --version):
    Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
    Maven home: /snap/intellij-idea-community/226/plugins/maven/lib/maven3
    Java version: 11.0.7, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
    Default locale: en_US, platform encoding: UTF-8
    OS name: "linux", version: "5.3.0-51-generic", arch: "amd64", family: "unix"

Additional context
(Add any other context about the problem here.)

kinbug

Most helpful comment

OK I can reproduce. This doesn't happen on Undertow.

All 24 comments

Hi I'm also getting too many similar crashes. Is there any workaround for that?

This is my crash traces https://sentry.io/share/issue/2ed01930df494218aca52e49cb1cf237/
@evanchooly can you please check this

@itboy87 i don't know why you tagged me specifically. this is not my area of expertise.

Sorry i thought may be you are dealing with issues on github can you please
tag relevant developer?

On Tue, 2 Jun 2020 at 9:05 AM, Justin Lee notifications@github.com wrote:

@itboy87 https://github.com/itboy87 i don't know why you tagged me
specifically. this is not my area of expertise.

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/quarkusio/quarkus/issues/9194#issuecomment-637258337,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/ACUSZDGVJCNFPEAKN6NSM63RUR27DANCNFSM4M5HMRNA
.

I have the same issue

@GET @Path("event") @Produces(MediaType.SERVER_SENT_EVENTS) @SseElementType(MediaType.TEXT_PLAIN) public Multi<String> listen() { return vertx.eventBus().localConsumer(eventChannel).toMulti().map(objectMessage -> objectMessage.body().toString()); }

@FroMage is this with you?

Well, perhaps, but I don't quite understand this issue. Perhaps @vietj can help me make sense of it?

I think that exception occur only when client close connection before request complete. So i don't think its bug but warning. But better if logs more clear like "connection closed by client"

OK I can reproduce. This doesn't happen on Undertow.

@patriot1burke do you remember why you added the VertxBufferImpl class? Using Buffer.buffer(ByteBuf) makes this issue go away and we have one less class to maintain. I'm sure you must have had a reason to not use it and write your own, do you remember?

@FroMage If you look at BufferImpl.getByteBuf() it does a copy of the underlying netty ByteBuf. VertxBufferImpl does not.

https://github.com/eclipse-vertx/vert.x/blob/3.9/src/main/java/io/vertx/core/buffer/impl/BufferImpl.java#L472

There's a number of places in the quarkus code which access the underlying ByteBuf

FYI, undertow has the same VertxBufferImpl implementation and I derived resteasy's implementation from that.

https://github.com/quarkusio/quarkus-http/blob/master/vertx/src/main/java/io/undertow/vertx/VertxBufferImpl.java

Ah OK. Except apparently there's a bug somewhere wrt buffer deallocation, which leads to those warnings. How can we go about fixing that, if we can't take Vert.x's impl which doesn't have this bug?

I don't know enough about vert.x or netty to give you a good answer.

It looks to me like resteasy over raw vertx is not doing reference counting correctly and not really anything to do with the fact that VertxBufferImpl exists as a class.

It's possible that both are related, in that vert.x will properly reference count its own buffer impl, and not ours?

I do see in VertxBlockingOutput that I am releasing the ByteBuf on error. This "bug" shows up when connection is closed in the middle of a request? If so, the fact that VertxBlockingOutput releases the ByteBuf on errors might be the issue.

I could test this, but I don't know if we should release it or not.

I don't think I implemented the ByteBuf release code here (@johnaohara did):

https://github.com/quarkusio/quarkus/blob/master/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java#L84

and defininately not here (that's your code stef):

https://github.com/quarkusio/quarkus/blob/master/extensions/resteasy/runtime/src/main/java/io/quarkus/resteasy/runtime/standalone/VertxBlockingOutput.java#L112

This may be the error as if you look at the "bug" report error stack track its trying to dereference a buffer that is already at 0.

My commit, but code from @stuartwdouglas ;)

I have the same issue and added quarkus-undertow dependency for workaround

I have the same issue and added quarkus-undertow dependency for workaround

I have no clue why, but adding this dependency solved my problem too.

As @FroMage said, it looks like there's a buffer deallocation bug that causing these warnings and undertow does not make this in the implementation it uses under the hood.

I had the same issue. Adding quarkus-undertow dependency worked for me too. Thanks!

Was this page helpful?
0 / 5 - 0 ratings