Graylog2-server: Graylog AMQP consumer stops acking messages after reconnect

Created on 12 Jan 2018  路  15Comments  路  Source: Graylog2/graylog2-server

Expected Behavior

It should reconnect and start consuming and acking messages as if nothing happened

Current Behavior

If a Graylog AMQP consumer is somehow disconnected from RabbitMQ, it does not properly resume consuming and acking messages. Instead, it consumes $prefetch number of messages but does not ack them. E.g. my queue is no longer being consumed.
Secondly, it spawns another consumer with the exact behaviour, consumes, but does not ack.
Thirdly (possibly related), each time I forcefully close the connection (using rabbitmqadmin close connection), the closed connection reappears and a new is made. Effectively doubling the number of connections that do not work. (I've been able to get up to 6000 connections from graylog to rabbitmq using this method, none of them acked anything.)

Possible Solution

Don't know how to fix or where to look, as I'm unfamiliar with the graylog codebase. The reason why is pretty obvious in my opinion. Network connections will fail and no connection has 100% uptime. Being able to resume normal operations without human intervention makes operators happy.

Steps to Reproduce (for bugs)

In RabbitMQ one connection (with a proper channel) is visible. Data sent to RabbitMQ
ends up in Graylog as expected. So far, so good.

However, when the Graylog <-> RabbitMQ connection is disconnected*, it is restored,
but no longer ack鈥檚 messages (resulting in 100 (default prefetch) unacked messages).
Secondary, another connection is made from the Graylog host to RabbitMQ. This connection also
does not ack messages (another 100 unacked messages linger around).

*disconnecting via shutting down network interface, rabbitmq restart or any other disruptive way.

Forcefully closing the connection by RabbitMQ (via webinterface or via rabbitmqadmin close connection), only makes it worse. Every connection that is terminated via this way, results in a reconnect from the Graylog host, and in a new connection (that also does not ack).

I can consistently reproduce this behaviour with the setup mentioned below.

Context

One of our RabbitMQ host had problems keeping up and eventually ran out or memory. We had to restart the RabbitMQ host and the Graylog host.
The problem occurred after one of our ISP's terminated the network connection shortly to switch transit connections.
Graylog not consuming any messages is not preferred, but spawning a large number of non-working consumers made it even worse in our case.

Your Environment

Versions I'm running:
Graylog: 2.4.0-1
(using docker image graylog/graylog:2.4.0-1,
docker-compose.yml taken from: https://hub.docker.com/r/graylog/graylog/)

RabbitMQ: 3.6.10, installed on an up-to-date Ubuntu 17.10 running in Virtualbox 5.2.0
RabbitMQ exchanges & queues
1 exchange (type=direct, durable=true)
1 queue (durable=true)
1 binding between exchange and queue

Graylog AMQP GELF input with default values and correct exchange, queue
and user/password information.

  • Elasticsearch Version: 5.6.5 (also taken from the same docker-compose.yml)
  • MongoDB Version: 3
  • Operating System: Docker 17.06.2-ce on a MacBook

Graylog terminal output contains multiple messages like:

2018-01-09 08:46:15,600 ERROR: org.graylog2.inputs.transports.AmqpConsumer - Error while trying to process AMQP message
graylog_1 | com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

#M blocker bug triaged

Most helpful comment

Same here. The issue is open for half a year now. It's pretty critical, since the RabbitMQ dies and all processing which depends on it stops. Do you need anymore information to reproduce and fix the bug?

best regards,
secana

All 15 comments

Hi,
we experience the same behavior with RabbitMQ 3.6.10. Every time we shut down the RabbitMQ Server the Graylog 2.3 losses its connection and channel. Graylog can recover the connection but not the Channel. We have the same ERROR message like above.

I think the rabbitmq libs need to be updated. https://github.com/rabbitmq/rabbitmq-java-client since https://github.com/Graylog2/graylog2-server/2.4/AmqpConsumer.java line 242: channel.isOpen() is not working.

We turned on Debugging if we can see any Log from https://github.com/Graylog2/graylog2-server/2.4/AmqpConsumer.java line 136 but found nothing. So something is wrong with .isOpen().

It would be really nice to get it fixed since we have to restart Graylog quite often. Thank you.

  • fixed Links

Hey there,

I'm running a productive Graylog Cluster (5 Nodes, 130 Million Msgs/Day) and I'm facing the same problem.

Everytime our RabbitMQ restarts or any other reason why a connection would be closed, the Graylog Server can't reconnect to the RabbitMQ.

I think that Graylog try to reconnect on the same channel, which is on RabbitMQ side closed. So Graylog needs to build up a new connetion instead of reconnecting at the old one.

Would be great if you can fix the problem, we have to restart the graylog nodes every time this issue appears!

Graylog: 2.3.2
RabbitMQ: 3.6

We're experiencing the same behaviour aswell.
Graylog version: 2.4.3
RabbitMQ: 3.6.15

This can be found in the logs:

2018-03-27T17:11:04.172+02:00 ERROR [AmqpConsumer] AMQP connection lost! Trying reconnect in 1 second.
2018-03-27T17:11:04.172+02:00 ERROR [AmqpConsumer] AMQP connection lost! Trying reconnect in 1 second.
2018-03-27T17:11:04.173+02:00 ERROR [AmqpConsumer] AMQP connection lost! Trying reconnect in 1 second.
2018-03-27T17:11:05.215+02:00 INFO  [AmqpConsumer] AMQP prefetch count overriden to <100>.
2018-03-27T17:11:05.215+02:00 INFO  [AmqpConsumer] AMQP prefetch count overriden to <100>.
2018-03-27T17:11:05.216+02:00 INFO  [AmqpConsumer] Connected! Re-starting consumer.
2018-03-27T17:11:05.216+02:00 INFO  [AmqpConsumer] Connected! Re-starting consumer.
2018-03-27T17:11:05.295+02:00 ERROR [AmqpConsumer] Error while trying to process AMQP message
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1014, class-id=60, method-id=80)
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[graylog.jar:?]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[graylog.jar:?]
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) ~[graylog.jar:?]
        at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164) ~[graylog.jar:?]
        at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) ~[graylog.jar:?]
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) ~[graylog.jar:?]
        at org.graylog2.inputs.transports.AmqpConsumer$2.handleDelivery(AmqpConsumer.java:133) [graylog.jar:?]
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [graylog.jar:?]
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [graylog.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
...
<lots more of the same stacktraces>

After that it goes silent and the queue starts filling up.

I am also experiencing the same errors.

Graylog Docker image - graylog/graylog:2.4.3-1
RabbitMQ Docker image - rabbitmq:3.7.4-management

I can confirm this happens whenever GL loses connectivity to RMQ, then tries to reconnect. It appears to connect to the broker successfully, but because the channel is closed on the broker, it can't actually start to consume messages until you restart the input. Additionally, it seems like the reconnect attempt spamming (it retries 10+ times a second) is causing the RMQ servers to crash. This is a test environment, so that may just be an instance sizing problem on the RMQ servers (I will be upgrading them today to verify).

2018-04-06 13:45:43,586 ERROR: org.graylog2.inputs.transports.AmqpConsumer - AMQP connection lost! Trying reconnect in 1 second.
2018-04-06 13:45:43,607 INFO : org.graylog2.inputs.transports.AmqpConsumer - AMQP prefetch count overriden to <100>.
2018-04-06 13:45:43,608 INFO : org.graylog2.inputs.transports.AmqpConsumer - Connected! Re-starting consumer.
2018-04-06 13:45:43,610 INFO : org.graylog2.inputs.transports.AmqpConsumer - AMQP prefetch count overriden to <100>.
2018-04-06 13:45:43,613 INFO : org.graylog2.inputs.transports.AmqpConsumer - Connected! Re-starting consumer.
2018-04-06 13:45:43,618 INFO : org.graylog2.inputs.transports.AmqpConsumer - Consumer running.
2018-04-06 13:45:43,618 ERROR: org.graylog2.inputs.transports.AmqpConsumer - AMQP connection lost! Trying reconnect in 1 second.
2018-04-06 13:45:43,628 INFO : org.graylog2.inputs.transports.AmqpConsumer - Consumer running.
2018-04-06 13:45:43,628 ERROR: org.graylog2.inputs.transports.AmqpConsumer - AMQP connection lost! Trying reconnect in 1 second.
2018-04-06 13:45:43,878 ERROR: org.graylog2.inputs.transports.AmqpConsumer - Error while trying to process AMQP message
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 79, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) ~[graylog.jar:?]
    at org.graylog2.inputs.transports.AmqpConsumer$2.handleDelivery(AmqpConsumer.java:133) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [graylog.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
2018-04-06 13:45:43,878 ERROR: org.graylog2.inputs.transports.AmqpConsumer - Error while trying to process AMQP message
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 79, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) ~[graylog.jar:?]
    at org.graylog2.inputs.transports.AmqpConsumer$2.handleDelivery(AmqpConsumer.java:133) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [graylog.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_151]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

Edit: Added some detail and removed erroneous observation about RabbitMQ crashing (unrelated to this problem).

Same here:

Graylog (by Vagrant) Graylog 2.4.4+4659dbe on graylog (Oracle Corporation 1.8.0_172 on Linux 3.13.0-145-generic)
RabbitMQ 3.6.5, Erlang 19.0

The error message is the same as previous one. And it happens when the connection to the queue server is bad.

Same here. The issue is open for half a year now. It's pretty critical, since the RabbitMQ dies and all processing which depends on it stops. Do you need anymore information to reproduce and fix the bug?

best regards,
secana

Same problem here running graylog + rabbitmq (3.6 and 3.7) in docker:

ERROR: org.graylog2.inputs.transports.AmqpConsumer - Error while trying to process AMQP message
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:312) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:306) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1165) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) ~[graylog.jar:?]
    at org.graylog2.inputs.transports.AmqpConsumer$2.handleDelivery(AmqpConsumer.java:133) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [graylog.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_72-internal]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_72-internal]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_72-internal]

Restarting the input fixes the problem (at least until the next time something restarts)

In many environments the network is unreliable, especially over multiple time periods. IMO if this bug isn't going to be resolved (and that's fine of course!) the AMQP transport should be marked deprecated.

I'm also running into this on 2.5.1 as well. Aside from running out of memory in rabbitmq (which we work around by bounding the queue, setting message TTL, and spilling extra log entries), the most frustrating thing here is that we eventually end up with log entries with inappropriate timestamps coming into the system.

It looks like it should be testable by cutting the connection on the server side (rabbitmq).

In my case it's reproducible as @dch suggest: Always when I update rabbitmq (apt-get update) I've to restart graylog afterwards.

@dch @syssi Thank you for the updates. We are going to look at this very soon and plan to fix it until the upcoming 3.0 release.

@bernd I'm able to test this now, email or ping dch on irc in #graylog channel for any further information or help I can offer.

just adding some notes from 2.5.1 from IRC and @dennisoelkers

You can simulate the issue by dropping the amqp connection on rabbitmq admin console, and seeing how graylog amqp input responds on reconnecting.

You'll see this in graylog logs:

root@graylog:~ # 10:55:52.345 [AMQP Connection 10.241.0.4:5672] WARN  com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured (Exception message: Connection reset)
10:55:52.346 [AMQP Connection 10.241.0.4:5672] ERROR org.graylog2.inputs.transports.AmqpConsumer - AMQP connection lost! Trying reconnect in 1 second.
10:55:53.354 [AMQP Connection 10.241.0.4:5672] INFO  org.graylog2.inputs.transports.AmqpConsumer - AMQP prefetch count overriden to <100>.
10:55:53.354 [AMQP Connection 10.241.0.4:5672] INFO  org.graylog2.inputs.transports.AmqpConsumer - Connected! Re-starting consumer.
10:55:53.355 [AMQP Connection 10.241.0.4:5672] INFO  org.graylog2.inputs.transports.AmqpConsumer - Consumer running.
10:55:58.378 [pool-66-thread-19] ERROR org.graylog2.inputs.transports.AmqpConsumer - Error while trying to process AMQP message
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2098, class-id=60, method-id=80)
    at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:416) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1164) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89) ~[graylog.jar:?]
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436) ~[graylog.jar:?]
    at org.graylog2.inputs.transports.AmqpConsumer$2.handleDelivery(AmqpConsumer.java:133) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [graylog.jar:?]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100) [graylog.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_192]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_192]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]

And on the RabbitMQ end the closed connection disappears, but is immediately re-established, obviously on a different TCP port:

image

unfortunately with two connections, only one of which is fetching, and never ACKs the messages. It appears to be blocked, and the other connection isn't doing anything at all so I left it out.

image

Thanks everyone for the information! This will be fixed in the upcoming 3.0 release.

:clap: many thanks @dennisoelkers this works perfectly in 3.0-b3 :rocket: :hamburger:

2019-01-18 13:26:35,448 INFO o.g.i.InputStateListener [eventbus-handler-1] Input [GELF AMQP/5c353f3107c94249aa88153d] is now RUNNING
2019-01-18 13:26:35,456 INFO o.g.i.InputStateListener [eventbus-handler-0] Input [GELF UDP/5bf7d9da07c942b78e2e22b0] is now RUNNING
2019-01-18 13:29:14,575 WARN c.r.c.i.ForgivingExceptionHandler [AMQP Connection 10.241.0.4:5672] An unexpected connection driver error occured (Exception message: Connection reset)
2019-01-18 13:29:14,576 WARN o.g.i.t.AmqpConsumer [AMQP Connection 10.241.0.4:5672] AMQP connection lost! Reconnecting ...

@dch Awesome! Thank you for the feedback! :smiley:

Was this page helpful?
0 / 5 - 0 ratings

Related issues

eroji picture eroji  路  4Comments

bpbp-boop picture bpbp-boop  路  4Comments

mikkolehtisalo picture mikkolehtisalo  路  4Comments

jalogisch picture jalogisch  路  3Comments

mhaasEFD picture mhaasEFD  路  4Comments