Describe the bug
We noticed that some messages were not getting processed by our consumer with a key shared subscription.
We only had one consumer at that time so we checked the subscriptions via the pulsar-admin topics stats command and noticed that it was showing two consumers instead.
The application is deployed on a Kubernetes cluster so we scaled all the pods down to make sure that there could be no consumers at all but in the stats we could still see the ghost consumer.
Before scaling all the pods down:
"cloud-spaceroom-service" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 21,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 21,
"type" : "Key_Shared",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "a0869e4030",
"availablePermits" : 0,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"address" : "/10.1.1.76:33118",
"connectedSince" : "2020-03-24T09:47:06.109Z"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "0c7db0c8c0",
"availablePermits" : 1000,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"address" : "/10.1.1.76:45426",
"connectedSince" : "2020-03-24T13:53:45.761Z"
} ],
"isReplicated" : false
},
After scaling all the pods down:
"cloud-spaceroom-service" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 21,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 21,
"type" : "Key_Shared",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "a0869e4030",
"availablePermits" : 0,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"address" : "/10.1.1.76:33118",
"connectedSince" : "2020-03-24T09:47:06.109Z"
} ],
"isReplicated" : false
},
In order to get rid of the ghost consumer we had to kill the pod with the Pulsar broker. Once Kubernetes restarted the broker the stats command finally showed the connected consumers only.
To Reproduce
I haven't been able to replicate the issue yet. I do have all the logs centralized and accessible though. It would be very helpful if you could help us understand what went wrong.
Expected behavior
I would have expected the consumer to disappear after having scaled down all the consumer pods, which is what happened for the connected consumer but not for the ghost one (i.e. a0869e4030 - see stats output above).
Unfortunately the bad thing is that messages were still being routed to the ghost consumer.
Additional context
Known information about the ghost consumer:
Additional information:
10.1.1.76)09:46:58.581831923Z09:46:57.788 to 09:47:03.631)09:47:03.558313436Z) the broker reported that it was creating 2 subscriptions for that topic and that it already had a consumer with id 18 present on the connectionConsumer with id 18 is already present on the connection09:47:03.616168539Z) that it couldn't reconnect the consumer due to an unknown errorUserJoinedSpace topic at 09:47:06.367671431Z and start again at 09:51:21.172230502Z which makes for a 254805ms gap (~4.25 minutes)More related logs:
Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/UserJoinedSpace, name=nodes-service}, consumerId=0, consumerName=71ea78f19e, address=/10.1.1.76:33122} with pending 0 acks[/10.1.1.76:33118] Cleared consumer created after timeout on client side Consumer{subscription=PersistentSubscription{topic=persistent://public/default/UserJoinedSpace, name=cloud-spaceroom-service}, consumerId=18, consumerName=a0869e4030, address=/10.1.1.76:33118}Hi @fracasula. According to your description, did you subscribe only once, right? I noticed you logs
[/10.1.1.76:33118] Cleared consumer created after timeout on client side Consumer{subscription=PersistentSubscription{topic=persistent://public/default/UserJoinedSpace, name=cloud-spaceroom-service}, consumerId=18, consumerName=a0869e4030, address=/10.1.1.76:33118}
Looks the issue is related to the consumer creation timeout. Can you find any log about Closed connection from?
@wolfstudy Hi xiaolong. I want to invite you to join the discussion. Maybe the problem is we don't close the consumer at the broker-side after creating consumer timeout at the client-side.
Hi @fracasula. According to your description, did you subscribe only once, right? I noticed you logs
[/10.1.1.76:33118] Cleared consumer created after timeout on client side Consumer{subscription=PersistentSubscription{topic=persistent://public/default/UserJoinedSpace, name=cloud-spaceroom-service}, consumerId=18, consumerName=a0869e4030, address=/10.1.1.76:33118}Looks the issue is related to the consumer creation timeout. Can you find any log about
Closed connection from?@wolfstudy Hi xiaolong. I want to invite you to join the discussion. Maybe the problem is we don't close the consumer at the broker-side after creating consumer timeout at the client-side.
Hi @codelipenghui, yes that's correct. Our code has been audited by 3 different engineers and we confirm that we're subscribing only once.
By looking at our client logs it looks like the client was having issues connecting to the broker so it rescheduled a few reconnections. By looking at the broker side instead we in fact see that the broker received multiple requests for creating that consumer and, although it did detect that a consumer with ID 18 was already present on the connection, my guess is that it didn't close one of the two after all.
These are the logs you requested reporting all "Connection closed from" on the broker side: closed_connections_logs.txt.
We should expect one from 10.1.1.76:33118, right?
Another question, I hope you don't mind. I was looking at this bit of code which clearly gets called in this case. I'm not very familiar with Java and futures so pardon me if the question doesn't make sense but in the comment right above that line you explain that you need to wait until the previous consumer creation request either completes or fails. What happens if that future hangs forever? Is that even possible or is there a timeout that would prevent that from happening so that eventually isDone returns?
EDIT: I forgot to mention, the IP you see (i.e. 10.1.1.76) is not the direct client but the Pulsar Proxy. I'll add it above as well.
@codelipenghui Thanks for the root cause, and Thanks @fracasula for the feed back. Seems we need to remove the consumer when timeout.
@codelipenghui @jiazhai Hello guys, any updates on this one? We noticed the new 2.5.1 and went through the changelog but by the looks of it it seems that version 2.5.1 does not contain a bugfix for this, correct? Thanks!
@fracasula I think 2.5.1 does not contain a bugfix for this issue. Looks the simple way to resolve this problem is to close the current connection and reconnect to the broker while timeout occurs. So that the broker can clean up the old consumers.
The other way is introducing a mechanism for the consumer heartbeat, which looks somewhat complicated.