Pulsar: Stuck consumer: terminate called after throwing an instance of 'std::bad_alloc'

Created on 28 Jul 2020  Â·  37Comments  Â·  Source: apache/pulsar

Describe the bug
A consumer gets stuck after seeing an error mostly probably coming from the cgo layer underneath:

terminate called after throwing an instance of 'std::bad_alloc'
what(): std::bad_alloc

To Reproduce
Steps to reproduce the behavior:

  1. Shell into Kubernetes pod and reset the cursor for a given subscription like bin/pulsar-admin topics reset-cursor persistent://public/default/SpaceEvents -s cloud-notifications-service -t 999w
  2. The consumer is able to read a few messages and then eventually fails with the above error. It doesn't seem to be trying anything (e.g. reconnection, termination...), it just gets stuck
  3. If we terminate the service manually it then resumes consuming and then after a while it eventually gets stuck again

Expected behavior
I would expect it to not block and to not raise any bad alloc error.

Screenshots
No screenshots available.

Desktop (please complete the following information):

  • OS: Kubernetes on GCP

Additional context
I cross referenced the logs of our consumer to see what happens on the Pulsar side when we get the bad alloc errors and we were able to find some interesting exceptions that seem to happen concomitantly with the bad alloc errors (see attached report).

Some errors are particularly interesting and make me think that we might have issues when reading entries from a ledger (bookkeeper). Is there anything you can suggest on how to better debug this? Thanks!

report.txt

componenc++ componengo triagweek-31 typbug

Most helpful comment

After further investigation I was able to come across two scenarios, which may be related.

Silent broker scenario

  • all consumers are connected with PING and PONG responses travelling over the wire
  • the stats tool reports several messages in the msgBacklog for the given topic/subscription and it shows all the consumers as connected but with a msgRateOut of 0
  • by looking at the Pulsar binary protocol I can say that everything goes smootly for all consumers, even if restarted, up to the flow stage and then hang forever waiting for a MESSAGE command that never comes, meaning they connect to the broker, send the 1000 permits and then just get pings from here, no messages whatsoever
  • killing the Pulsar proxy or restarting the consumers do not help
  • restarting the Brokers fix the issue
  • when trying with the official latest Python 2.6.1 client, it just dies as shown below
2020-08-31 08:37:07.728 INFO  [139696078108480] Client:88 | Subscribing on Topic :SpaceEvents
2020-08-31 08:37:07.728 INFO  [139696078108480] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2020-08-31 08:37:07.729 INFO  [139696048822016] ClientConnection:343 | [[::1]:57112 -> [::1]:6650] Connected to broker
2020-08-31 08:37:08.286 INFO  [139696048822016] HandlerBase:53 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Getting connection from pool
2020-08-31 08:37:08.426 INFO  [139696048822016] ConnectionPool:85 | Created connection for pulsar://10.56.3.23:6650
2020-08-31 08:37:08.427 INFO  [139696048822016] ClientConnection:345 | [[::1]:57114 -> [::1]:6650] Connected to broker through proxy. Logical broker: pulsar://10.56.3.23:6650
2020-08-31 08:37:08.968 WARN  [139696048822016] ClientConnection:947 | [[::1]:57114 -> [::1]:6650] Received error response from server: UnknownError -- req_id: 0
2020-08-31 08:37:08.968 ERROR [139696048822016] ConsumerImpl:242 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Failed to create consumer: UnknownError
Traceback (most recent call last):
  File "main.py", line 4, in <module>
    consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
  File "/home/francesco/.local/lib/python3.8/site-packages/pulsar/__init__.py", line 655, in subscribe
    c._consumer = self._client.subscribe(topic, subscription_name, conf)
Exception: Pulsar error: UnknownError
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | [[::1]:57114 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | [[::1]:57112 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | [[::1]:57114 -> [::1]:6650] Destroyed connection
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | [[::1]:57112 -> [::1]:6650] Destroyed connection

This is the Python code I used:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')


def consume():
    while True:
        msg = consumer.receive()
        try:
            print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
            # Acknowledge successful processing of the message
            consumer.acknowledge(msg)
        except:
            # Message failed to be processed
            consumer.negative_acknowledge(msg)


if __name__ == '__main__':
    consume()
    client.close()

And the requirements.txt:

pulsar-client==2.6.1
apache-bookkeeper-client==4.11.0
grpcio<1.26.0

Possible acking deadlock (Golang client)

  • all consumers report PING and PONG responses in their logs (verbosity set to trace)
  • the stats tool reports no consumers at all despite all clients are print the PING/PONG successfully in their logs
  • the stats tool reports a msgBacklog greater than zero so there are messages waiting to be processed
  • by getting a full goroutine stack dump I was able to determine that all consumers are stuck here
  • to be 100% sure so that it wouldn't just be a case of having the service trying to ack every time I was getting the full goroutine stack dump, I made changes to the Golang client by adding a ticker that prints Trying to ack on the logs whenever it was taking more than 150ms

The last bullet point means that I change this code:

func (pc *partitionConsumer) runEventsLoop() {
    defer func() {
        pc.log.Debug("exiting events loop")
    }()
    for {
        select {
        case <-pc.closeCh:
            return
        case i := <-pc.eventsCh:
            switch v := i.(type) {
            case *ackRequest:
                pc.internalAck(v)

Like this:

func (pc *partitionConsumer) runEventsLoop() {
    defer func() {
        pc.log.Debug("exiting events loop")
    }()
    for {
        select {
        case <-pc.closeCh:
            return
        case i := <-pc.eventsCh:
            switch v := i.(type) {
            case *ackRequest:
                ctx, cancel := context.WithCancel(context.Background())
                go func(v *ackRequest) {
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            pc.log.Infof("Trying to ack %+v (%d - %d)", v.msgID, len(pc.eventsCh), cap(pc.eventsCh))
                            time.Sleep(150 * time.Millisecond)
                        }
                    }
                }(v)

                pc.internalAck(v)
                cancel()

If the ack happens within 150ms we don't see any logs. The problem is that now I have all consumers stuck in a never ending loop just printing:

{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}

This has been going on for several hours and the message ID of the log entry is always the same until I kill the consumer.

Could it be that, given that the eventsCh is a buffered channel of 3, when a connection gets closed due to a message frame size that is too big then the runEventsLoop() never gets to process the *connectionClosed event due to at least 3 in-flight ack requests?

Meaning: we could have 3 ack requests that are already keeping the channel full, the connection gets closed, the acks can't be processed because the connection was closed and we cannot reconnect to the broker because, due to the channel being full, we can't push *connectionClosed event into the eventsCh thus it never gets processed = deadlock?

In support of this theory I can see this in the logs right before I get the never ending Trying to ack loop:

{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"level":"debug","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Write data: 25","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"DEBUG","time":"2020-08-31T17:47:23+02:00"}
{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"level":"info","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Connection closed","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"INFO","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}

Also by analyzing the stack dump I can see that one of the 21 goroutines running is stuck waiting here.

1 @ 0x449d3b 0x41229f 0x412095 0xc5b3f5 0xc46394 0xc4ceb9 0xc41f5e 0xc4ccc6 0x47d571
#   0xc5b3f4    github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).ConnectionClosed+0x64    /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/consumer_partition.go:567
#   0xc46393    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close+0x583        /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:751
#   0xc4ceb8    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func1+0x168    /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:363
#   0xc41f5d    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x2bd      /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:369
#   0xc4ccc5    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x85   /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:231

Killing the consumer in this case helps but it eventually gets stuck again trying to ack some other message.

We could potentially try to look into the second scenario but have no clue whatsoever about the first one and we have no Java expertise. Can someone please look a bit more into this and tell us whether you need more information? Thanks.

All 37 comments

Just tried to consume from the same topic with the new Golang library (I noticed that you guys deprecated the cgo one). With the pure Golang one instead of getting stuck with the bad::alloc it fails and returns this:

WARN[0034] Received too big frame size. size=234969386   local_addr="[::1]:44310" remote_addr="pulsar://localhost:6650"
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x68 pc=0xadee3d]

goroutine 409 [running]:
github.com/apache/pulsar-client-go/pulsar/internal.(*MessageReader).readChecksum(0xc00067d8f8, 0x0, 0xc00067d6d8, 0x4160bd)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/commands.go:78 +0x2d
github.com/apache/pulsar-client-go/pulsar/internal.(*MessageReader).ReadMessageMetadata(0xc00067d8f8, 0xc00067d910, 0x0, 0x0)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/commands.go:95 +0x40
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived(0xc00010a4e0, 0xc0003e2f40, 0x0, 0x0, 0x1, 0x0)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/consumer_partition.go:370 +0xa8
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage(0xc0002e9560, 0xc0003e2f40, 0x0, 0x0)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:543 +0x142
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc0002e9560, 0xc0000188c0, 0x0, 0x0)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:457 +0x24d
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc0002e9560)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:340 +0x304
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc0002e9560)
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:204 +0x59
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
    /home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:201 +0x3f
make: *** [Makefile:46: run] Error 2

According to the log coming from your library right before the panic it says that it received a frame that is too big? Any ideas?

I realized that 2.6.0 is out, we're running on 2.5.1. We'll upgrade and come back to you, sorry for the inconvenience.

We upgraded this morning to 2.6.0 in production with the latest Pulsar Golang client library (i.e. no cgo). Of course we tested our application on both testing and staging before going live and we didn't find any compatibility issues. Unfortunately as we deployed the new version the scenario described above happened in a much larger scale affecting several consumers at the same time.

The backlog started to grow quite a bit so we decided to downgrade back to 2.5.1. That helped a bit but some consumers were definitely still stuck. Upgraded again to 2.6.0, this time with the autoSkipNonRecoverableData set to true. With these settings and jobs that periodically restart our pods in Kubernetes we managed to get it more stable.

Of course we would like to avoid having to kill our pods on a regular basis so it would be nice to find a solution.

When looking at the logs on the client side there's nothing relevant coming from the Pulsar library, it just reports logs like:

{"level":"debug","name":"cloud-nodes-service-3681597d6aa5773d","severity":"DEBUG","subscription":"cloud-nodes-service","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}
{"cnx":"10.1.15.135:40246 -\u003e 10.2.7.212:6650","level":"debug","severity":"DEBUG","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}
{"cnx":"10.1.15.135:40246 -\u003e 10.2.7.212:6650","level":"info","producer_name":"cloud-nodes-service-3681597d6aa5773d","severity":"INFO","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}

On the server side (mostly the Pulsar broker) we got plenty of exceptions though, such as:

pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/NodeAlarmCountersUpdated] Error opening ledger for reading at position 297169:27536 - org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] WARN  org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/NodeAlarmCountersUpdated][cloud-nodes-service] read failed from ledger at position:297169:27536 : Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeAlarmCountersUpdated / cloud-nodes-service] Error reading entries at 297169:27536 : Unknown exception, Read Type Normal - Retrying to read in 54.321 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.406 [pulsar-io-23-4] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeInfoUpdated / cloud-spaceroom-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.428 [offloader-OrderedScheduler-1-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker java.lang.NullPointerException: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:319) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:287) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$ParameterReader.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:275) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$DeserializeIntoParameterizedConstructor.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:187) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.gson.Gson.fromJson(Gson.java:888) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.gson.Gson.fromJson(Gson.java:853) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.GsonWrapper.fromJson(GsonWrapper.java:55) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:82) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:76) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:61) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:41) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.invoke(InvokeHttpMethod.java:91) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:74) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:45) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.reflect.FunctionalReflection$FunctionalInvocationHandler.handleInvocation(FunctionalReflection.java:117) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:86) ~[com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.sun.proxy.$Proxy122.getObject(Unknown Source) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.googlecloudstorage.blobstore.GoogleCloudStorageBlobStore.getBlob(GoogleCloudStorageBlobStore.java:246) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.blobstore.internal.BaseBlobStore.getBlob(BaseBlobStore.java:217) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:37) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.sun.proxy.$Proxy73.getBlob(Unknown Source) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.open(BlobStoreBackedReadHandleImpl.java:195) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$readOffloaded$5(BlobStoreManagedLedgerOffloader.java:556) ~[?:?] 
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.444 [offloader-OrderedScheduler-1-0] ERROR org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader - Failed readOffloaded:
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker org.jclouds.http.HttpResponseException: Error parsing input: null

or

pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.482 [offloader-OrderedScheduler-0-0] ERROR org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader - Failed readOffloaded:
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker org.jclouds.http.HttpResponseException: Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker {statusCode=200, message=OK, headers={Server=[UploadServer], ETag=[CNqKo4De+OoCEAE=], X-GUploader-UploadID=[AAANsUn4S2D94evffqRwEGbgp4-4afCAlL11_kSwnURQy8ukjkf4B9-9HFkMFNoBy62PYgKa7ZRFwa-aJB73GpNWRYl4g6Uk8w], Vary=[X-Origin, Origin], Date=[Tue, 11 Aug 2020 07:36:04 GMT]}, payload=[content=true, contentMetadata=[cacheControl=private, max-age=0, must-revalidate, no-transform, contentDisposition=null, contentEncoding=null, contentLanguage=null, contentLength=1234, contentMD5=null, contentType=application/json; charset=UTF-8, expires=Tue Aug 11 07:36:04 UTC 2020], written=false, isSensitive=false]}
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:67) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:41) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.invoke(InvokeHttpMethod.java:91) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:74) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:45) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.reflect.FunctionalReflection$FunctionalInvocationHandler.handleInvocation(FunctionalReflection.java:117) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:86) ~[com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.sun.proxy.$Proxy122.getObject(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.googlecloudstorage.blobstore.GoogleCloudStorageBlobStore.getBlob(GoogleCloudStorageBlobStore.java:246) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.blobstore.internal.BaseBlobStore.getBlob(BaseBlobStore.java:217) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:37) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.sun.proxy.$Proxy73.getBlob(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.open(BlobStoreBackedReadHandleImpl.java:195) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$readOffloaded$5(BlobStoreManagedLedgerOffloader.java:556) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker Caused by: java.lang.NullPointerException
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:319) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:287) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$ParameterReader.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:275) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$DeserializeIntoParameterizedConstructor.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:187) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.gson.Gson.fromJson(Gson.java:888) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at com.google.gson.Gson.fromJson(Gson.java:853) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.json.internal.GsonWrapper.fromJson(GsonWrapper.java:55) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:82) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:76) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:61) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker    ... 28 more
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/AlarmEventsUpdated] Error opening ledger for reading at position 297170:27534 - org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] WARN  org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/AlarmEventsUpdated][cloud-nodes-service] read failed from ledger at position:297170:27534 : Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/AlarmEventsUpdated / cloud-nodes-service] Error reading entries at 297170:27534 : Unknown exception, Read Type Normal - Retrying to read in 59.128 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.935 [pulsar-io-23-3] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeInfoUpdated / cloud-spaceroom-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.954 [offloader-OrderedScheduler-1-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.742 [pulsar-io-23-1] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/AlarmEventsUpdated / cloud-nodes-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.746 [pulsar-load-manager-3-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to ZooKeeper because maximum change 10.64431369304657% exceeded threshold 10%; time since last report written is 10.0 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.751 [pulsar-ordered-OrderedExecutor-5-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x101c63ce3fa0003 local:/10.1.29.4:52068 remoteserver:pulsar-zookeeper-1.pulsar-zookeeper/10.1.34.231:2181 lastZxid:47245067907 xid:29534 sent:29536 recv:30476 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/loadbalance/brokers/10.1.29.4:8080
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.761 [offloader-OrderedScheduler-0-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker java.lang.NullPointerException: null

Here are the full logs.

Any ideas?

/cc @gaoran10 looks the problem the same as you have met. Could you please also help take a look?

@fracasula Hi, the jclouds NPE problem is similar to the issue #7402. Currently, the Pulsar 2.6.0 couldn't read offloaded data from GCS or offload data to GCS well. Maybe you offload data by Pulsar 2.5.0 successfully and failed to read the offloaded data by Pulsar 2.6.0. This issue is already fixed by PR #7435 and Pulsar 2.6.1 will cover this PR.

@codelipenghui @gaoran10 We created a clone of the whole production environment with the offloaded buckets too and, even with 2.6.1, we're perfectly able to replicate. In fact the consumers are still getting stuck. We're able to unlock them only by manually killing the pod on Kubernetes.

I deployed 3 consumers with a key shared subscription listening to our SpaceEvents topic. Once all three are running I reset the consumers offset for that subscription to by doing bin/pulsar-admin topics reset-cursor SpaceEvents -s cloud-pulsar-tester -t 99999w. After the reset I can see that the 3 consumers started reading messages. The msgRateOut showed in the bin/pulsar-admin topics stats SpaceEvents also shows that all 3 are reading messages.

At some point, when there are around 181k messages left in the backlog, the consumers get stuck.

Golang µservices logs report warnings only, coming from the Pulsar library itself (which uses logrus), they are not coming from our code:

{ remote_addr: { ForceQuery: false Host: "pulsar-proxy.pulsar:6650" RawPath: "" User: null Fragment: "" Opaque: "" RawQuery: "" Path: "" Scheme: "pulsar" RawFragment: "" } level: "warn" local_addr: { Port: 44814 IP: "10.56.0.18" Zone: "" } }
{ remote_addr: { RawPath: "" Scheme: "pulsar" Path: "" Opaque: "" RawQuery: "" Host: "pulsar-proxy.pulsar:6650" RawFragment: "" Fragment: "" User: null ForceQuery: false } level: "warn" error: "write tcp 10.56.0.18:44814->10.0.1.68:6650: use of closed network connection" local_addr: { Port: 44814 Zone: "" IP: "10.56.0.18" } }
{ remote_addr: { RawQuery: "" Opaque: "" ForceQuery: false RawFragment: "" Scheme: "pulsar" Fragment: "" RawPath: "" Path: "" Host: "pulsar-proxy.pulsar:6650" User: null } local_addr: { Zone: "" Port: 42612 IP: "10.56.0.21" } level: "warn" }
{ level: "warn" remote_addr: { User: null Path: "" Scheme: "pulsar" RawQuery: "" Fragment: "" Opaque: "" Host: "pulsar-proxy.pulsar:6650" ForceQuery: false RawPath: "" RawFragment: "" } local_addr: { Zone: "" IP: "10.56.0.21" Port: 42612 } error: "write tcp 10.56.0.21:42612->10.0.1.68:6650: use of closed network connection" }
{ local_addr: { Zone: "" Port: 51340 IP: "10.56.0.17" } remote_addr: { User: null ForceQuery: false RawPath: "" RawFragment: "" RawQuery: "" Path: "" Host: "pulsar-proxy.pulsar:6650" Opaque: "" Fragment: "" Scheme: "pulsar" } level: "warn" }

Pulsar logs report the following (by grepping by exception).

Broker

java.lang.NullPointerException: null

org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception

08:25:35.597 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/SpaceEvents / cloud-pulsar-tester] Error reading entries at 1430:0 : Unknown exception, Read Type Replay - Retrying to read in 15.0 seconds

java.lang.NullPointerException: null
    at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1776) ~[org.apache.pulsar-pulsar-common-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:87) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

java.lang.NullPointerException: null
    at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:125) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

java.lang.NullPointerException: null
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

08:27:09.703 [bookkeeper-ml-workers-OrderedExecutor-3-0] WARN  org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/SpaceEvents][cloud-pulsar-tester] read failed from ledger at position:2193:0 : Unknown exception

08:27:09.703 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/SpaceEvents / cloud-pulsar-tester] Error reading entries at 2193:0 : Unknown exception, Read Type Normal - Retrying to read in 15.0 seconds

java.lang.NullPointerException: null
    at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1776) ~[org.apache.pulsar-pulsar-common-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:87) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

java.lang.NullPointerException: null
    at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:125) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

java.lang.NullPointerException: null
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]

Proxy

08:27:14.083 [pulsar-proxy-io-2-3] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.

io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

08:27:14.090 [pulsar-proxy-io-2-3] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/10.56.0.21:42612] Got exception NativeIoException : readAddress(..) failed: Connection reset by peer null

08:27:38.090 [pulsar-proxy-io-2-8] WARN  io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.

io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

08:27:38.090 [pulsar-proxy-io-2-8] WARN  org.apache.pulsar.proxy.server.ProxyConnection - [/10.56.0.17:51340] Got exception NativeIoException : readAddress(..) failed: Connection reset by peer null

When inspecting the subscription, via the pulsar-admin cli tool, we see no consumers at all despite having 3 services running (and using your Pulsar Golang library - the native one, no cgo):

$ bin/pulsar-admin topics stats SpaceEvents | jq '.subscriptions["cloud-pulsar-tester"]' -c | jq

{
  "msgRateOut": 0,
  "msgThroughputOut": 0,
  "bytesOutCounter": 0,
  "msgOutCounter": 0,
  "msgRateRedeliver": 0,
  "chuckedMessageRate": 0,
  "msgBacklog": 181626,
  "msgBacklogNoDelayed": 181626,
  "blockedSubscriptionOnUnackedMsgs": false,
  "msgDelayed": 0,
  "unackedMessages": 0,
  "type": "Key_Shared",
  "msgRateExpired": 0,
  "lastExpireTimestamp": 0,
  "lastConsumedFlowTimestamp": 1598516857278,
  "lastConsumedTimestamp": 0,
  "lastAckedTimestamp": 0,
  "consumers": [],
  "isDurable": true,
  "isReplicated": false
}

If we inspect the Golang µservices logs with a lower debug level we see logs coming from the underlying Pulsar Golang library as shown below (please note that the following logs are regularly printed also after the consumers get stuck):

{"level":"debug","severity":"DEBUG","time":"2020-08-27T09:14:12Z","topic":"SpaceEvents"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}

We're getting also these from your Pulsar lib:

{ msgID: { partition: -1 ledgerId: 389 entryId: 6196 } name: "cloud-pulsar-tester-se-c2-f5lgn" topic: "persistent://public/default/SpaceEvents" subscription: "cloud-pulsar-tester" validationError: 2 consumerID: 1 level: "error" }
{ local_addr: { Zone: "" IP: "10.56.2.26" Port: 46050 } error: "corrupted message" consumerID: 1 remote_addr: { Path: "" Fragment: "" RawFragment: "" Opaque: "" ForceQuery: false RawQuery: "" Scheme: "pulsar" Host: "pulsar-proxy.pulsar:6650" User: null RawPath: "" } level: "error" }

Those came out with v0.2.0 too which was released ~13 hours ago.

Sorry, I was somehow dropping the message attribute from the logs coming from your Golang library (i.e. when converting them to the Stackdriver format). These should hopefully make more sense:

{ message: "Received too big frame size. size=1869573421" level: "warn" local_addr: { IP: "10.56.0.34" Port: 60946 Zone: "" } remote_addr: { Opaque: "" User: null RawFragment: "" RawPath: "" Host: "pulsar-proxy.pulsar:6650" Path: "" Scheme: "pulsar" RawQuery: "" Fragment: "" ForceQuery: false } }
{ message: "Failed to write on connection" error: "write tcp 10.56.0.34:60946->10.0.1.68:6650: use of closed network connection" level: "warn" local_addr: { Zone: "" Port: 60946 IP: "10.56.0.34" } remote_addr: { Path: "" Host: "pulsar-proxy.pulsar:6650" RawFragment: "" Opaque: "" Scheme: "pulsar" User: null RawQuery: "" ForceQuery: false RawPath: "" Fragment: "" } }

After further investigation I was able to come across two scenarios, which may be related.

Silent broker scenario

  • all consumers are connected with PING and PONG responses travelling over the wire
  • the stats tool reports several messages in the msgBacklog for the given topic/subscription and it shows all the consumers as connected but with a msgRateOut of 0
  • by looking at the Pulsar binary protocol I can say that everything goes smootly for all consumers, even if restarted, up to the flow stage and then hang forever waiting for a MESSAGE command that never comes, meaning they connect to the broker, send the 1000 permits and then just get pings from here, no messages whatsoever
  • killing the Pulsar proxy or restarting the consumers do not help
  • restarting the Brokers fix the issue
  • when trying with the official latest Python 2.6.1 client, it just dies as shown below
2020-08-31 08:37:07.728 INFO  [139696078108480] Client:88 | Subscribing on Topic :SpaceEvents
2020-08-31 08:37:07.728 INFO  [139696078108480] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2020-08-31 08:37:07.729 INFO  [139696048822016] ClientConnection:343 | [[::1]:57112 -> [::1]:6650] Connected to broker
2020-08-31 08:37:08.286 INFO  [139696048822016] HandlerBase:53 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Getting connection from pool
2020-08-31 08:37:08.426 INFO  [139696048822016] ConnectionPool:85 | Created connection for pulsar://10.56.3.23:6650
2020-08-31 08:37:08.427 INFO  [139696048822016] ClientConnection:345 | [[::1]:57114 -> [::1]:6650] Connected to broker through proxy. Logical broker: pulsar://10.56.3.23:6650
2020-08-31 08:37:08.968 WARN  [139696048822016] ClientConnection:947 | [[::1]:57114 -> [::1]:6650] Received error response from server: UnknownError -- req_id: 0
2020-08-31 08:37:08.968 ERROR [139696048822016] ConsumerImpl:242 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Failed to create consumer: UnknownError
Traceback (most recent call last):
  File "main.py", line 4, in <module>
    consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
  File "/home/francesco/.local/lib/python3.8/site-packages/pulsar/__init__.py", line 655, in subscribe
    c._consumer = self._client.subscribe(topic, subscription_name, conf)
Exception: Pulsar error: UnknownError
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | [[::1]:57114 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:1387 | [[::1]:57112 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | [[::1]:57114 -> [::1]:6650] Destroyed connection
2020-08-31 08:37:08.974 INFO  [139696078108480] ClientConnection:238 | [[::1]:57112 -> [::1]:6650] Destroyed connection

This is the Python code I used:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')


def consume():
    while True:
        msg = consumer.receive()
        try:
            print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
            # Acknowledge successful processing of the message
            consumer.acknowledge(msg)
        except:
            # Message failed to be processed
            consumer.negative_acknowledge(msg)


if __name__ == '__main__':
    consume()
    client.close()

And the requirements.txt:

pulsar-client==2.6.1
apache-bookkeeper-client==4.11.0
grpcio<1.26.0

Possible acking deadlock (Golang client)

  • all consumers report PING and PONG responses in their logs (verbosity set to trace)
  • the stats tool reports no consumers at all despite all clients are print the PING/PONG successfully in their logs
  • the stats tool reports a msgBacklog greater than zero so there are messages waiting to be processed
  • by getting a full goroutine stack dump I was able to determine that all consumers are stuck here
  • to be 100% sure so that it wouldn't just be a case of having the service trying to ack every time I was getting the full goroutine stack dump, I made changes to the Golang client by adding a ticker that prints Trying to ack on the logs whenever it was taking more than 150ms

The last bullet point means that I change this code:

func (pc *partitionConsumer) runEventsLoop() {
    defer func() {
        pc.log.Debug("exiting events loop")
    }()
    for {
        select {
        case <-pc.closeCh:
            return
        case i := <-pc.eventsCh:
            switch v := i.(type) {
            case *ackRequest:
                pc.internalAck(v)

Like this:

func (pc *partitionConsumer) runEventsLoop() {
    defer func() {
        pc.log.Debug("exiting events loop")
    }()
    for {
        select {
        case <-pc.closeCh:
            return
        case i := <-pc.eventsCh:
            switch v := i.(type) {
            case *ackRequest:
                ctx, cancel := context.WithCancel(context.Background())
                go func(v *ackRequest) {
                    for {
                        select {
                        case <-ctx.Done():
                            return
                        default:
                            pc.log.Infof("Trying to ack %+v (%d - %d)", v.msgID, len(pc.eventsCh), cap(pc.eventsCh))
                            time.Sleep(150 * time.Millisecond)
                        }
                    }
                }(v)

                pc.internalAck(v)
                cancel()

If the ack happens within 150ms we don't see any logs. The problem is that now I have all consumers stuck in a never ending loop just printing:

{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}

This has been going on for several hours and the message ID of the log entry is always the same until I kill the consumer.

Could it be that, given that the eventsCh is a buffered channel of 3, when a connection gets closed due to a message frame size that is too big then the runEventsLoop() never gets to process the *connectionClosed event due to at least 3 in-flight ack requests?

Meaning: we could have 3 ack requests that are already keeping the channel full, the connection gets closed, the acks can't be processed because the connection was closed and we cannot reconnect to the broker because, due to the channel being full, we can't push *connectionClosed event into the eventsCh thus it never gets processed = deadlock?

In support of this theory I can see this in the logs right before I get the never ending Trying to ack loop:

{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"level":"debug","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Write data: 25","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"DEBUG","time":"2020-08-31T17:47:23+02:00"}
{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"level":"info","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Connection closed","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"INFO","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}

Also by analyzing the stack dump I can see that one of the 21 goroutines running is stuck waiting here.

1 @ 0x449d3b 0x41229f 0x412095 0xc5b3f5 0xc46394 0xc4ceb9 0xc41f5e 0xc4ccc6 0x47d571
#   0xc5b3f4    github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).ConnectionClosed+0x64    /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/consumer_partition.go:567
#   0xc46393    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close+0x583        /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:751
#   0xc4ceb8    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func1+0x168    /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:363
#   0xc41f5d    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x2bd      /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:369
#   0xc4ccc5    github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x85   /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:231

Killing the consumer in this case helps but it eventually gets stuck again trying to ack some other message.

We could potentially try to look into the second scenario but have no clue whatsoever about the first one and we have no Java expertise. Can someone please look a bit more into this and tell us whether you need more information? Thanks.

@fracasula Looks the broker error log is the same as https://github.com/streamnative/pulsar/issues/1366 that I have met before when I working on some performance tests.

also @BewareMyPower would you please help to take a look on the cpp error stack.

@jiazhai Looks like the cpp client's error is just caused by the broker.

// ClientConnection.cc
                case BaseCommand::ERROR: {
                    const CommandError& error = incomingCmd_.error();
                    Result result = getResult(error.error());
                    LOG_WARN(cnxString_ << "Received error response from server: " << result
                                        << " -- req_id: " << error.request_id());

The broker gave an unknown ServerError or just an UnknownError when processing the subscribe command.

@wolfstudy to take a look at the go-client issue. it looks like a command not handled well in the protocol.

@fracasula I have pushed a new docker image to the docker hub https://hub.docker.com/layers/streamnative/pulsar/2.6.1-sn-hotfix-1/images/sha256-6ad564327342355d57fa12ba5a2f35ca84957c9949313524095a58623cfe0834?context=explore

This image is based on 2.6.1 and contains https://github.com/streamnative/pulsar/pull/1436. You can upload the broker log file if you reproduce the problem.

@codelipenghui thanks! I'll give it a try :+1:

@codelipenghui I replaced apachepulsar/pulsar:2.6.1 with streamnative/pulsar:2.6.1-sn-hotfix-1 in my Kubernetes configuration but now I have the pods failing on startup with this error:

/usr/bin/env: ‘python’: No such file or directory

Here's an excerpt of my configuration from the Pulsar broker deployment:

containers:
- args:
  - |
    bin/apply-config-from-env.py conf/broker.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && bin/apply-config-from-env.py conf/client.conf && bin/gen-yml-from-env.py conf/functions_worker.yml && exec bin/pulsar broker
  command:
  - sh
  - -c
  env:
  - name: advertisedAddress
    valueFrom:
      fieldRef:
        apiVersion: v1
        fieldPath: status.podIP
  envFrom:
  - configMapRef:
      name: pulsar-broker
  image: streamnative/pulsar:2.6.1-sn-hotfix-1
  imagePullPolicy: IfNotPresent

@fracasula Oh, I see. I will build a new image for you.

@fracasula You can try this one https://hub.docker.com/layers/streamnative/pulsar/2.6.1-sn-hotfix-2/images/sha256-765a82e1601e64cdb8ae8f8d7bd38a4ba936cf5f16557c7f7bb36904813bf0ff?context=explore

@codelipenghui I got this coming from the broker:

11:35:08.153 [main] ERROR org.apache.pulsar.PulsarBrokerStarter - Failed to start pulsar service.
org.apache.pulsar.broker.PulsarServerException: org.apache.pulsar.broker.PulsarServerException: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
    at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:587) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    at org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.start(PulsarBrokerStarter.java:280) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    at org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:349) [org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
Caused by: org.apache.pulsar.broker.PulsarServerException: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
    at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:912) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:458) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    ... 2 more
Caused by: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
    at org.apache.bookkeeper.mledger.offload.Offloaders.getOffloaderFactory(Offloaders.java:42) ~[org.apache.pulsar-managed-ledger-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:893) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:458) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
    ... 2 more

Bookkeeper, zookeeper and proxy seem fine instead.

@fracasula interesting... did you specify the offloader such as managedLedgerOffloadDriver in the broker.conf? The default managedLedgerOffloadDriver is null, so the broker will not find the google-cloud-storage driver.

@codelipenghui yeah I think I did, in fact it works with the usual apachepulsar/pulsar:2.6.1 image. Here's an excerpt from the deployment:

      containers:
      - args:
        - |
          bin/apply-config-from-env.py conf/broker.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && bin/apply-config-from-env.py conf/client.conf && bin/gen-yml-from-env.py conf/functions_worker.yml && exec bin/pulsar broker
        command:
        - sh
        - -c
        envFrom:
        - configMapRef:
            name: pulsar-broker

As you can see it applies the configuration coming from the environment variables which are defined in the below configmap:

apiVersion: v1
data:
  PULSAR_EXTRA_OPTS: -Dpulsar.log.root.level=trace
  PULSAR_GC: '"-XX:+UseG1GC -XX:MaxGCPauseMillis=10"'
  PULSAR_LOG_LEVEL: trace
  PULSAR_LOG_ROOT_LEVEL: trace
  PULSAR_MEM: '"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Dio.netty.leakDetectionLevel=disabled
    -Dio.netty.recycler.linkCapacity=1024 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions
    -XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32
    -XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError
    -XX:+PerfDisableSharedMem"'
  allowAutoTopicCreationType: non-partitioned
  autoSkipNonRecoverableData: "true"
  backlogQuotaDefaultRetentionPolicy: producer_exception
  clusterName: pulsar
  configurationStoreServers: pulsar-zookeeper-0.pulsar-zookeeper,pulsar-zookeeper-1.pulsar-zookeeper,pulsar-zookeeper-2.pulsar-zookeeper
  deduplicationEnabled: "true"
  defaultRetentionSizeInMB: "-1"
  defaultRetentionTimeInMinutes: "-1"
  exposeConsumerLevelMetricsInPrometheus: "true"
  exposeTopicLevelMetricsInPrometheus: "true"
  gcsManagedLedgerOffloadBucket: netdata-pulsar-debugging
  gcsManagedLedgerOffloadRegion: ""
  gcsManagedLedgerOffloadServiceAccountKeyFile: /pulsar/gcp-service-account/key
  managedLedgerDefaultAckQuorum: "2"
  managedLedgerDefaultEnsembleSize: "2"
  managedLedgerDefaultWriteQuorum: "2"
  managedLedgerOffloadAutoTriggerSizeThresholdBytes: "32000000"
  managedLedgerOffloadDeletionLagMs: "7200000"
  managedLedgerOffloadDriver: google-cloud-storage
  maxConsumersPerSubscription: "500"
  maxConsumersPerTopic: "500"
  maxProducersPerTopic: "100"
  zookeeperServers: pulsar-zookeeper-0.pulsar-zookeeper,pulsar-zookeeper-1.pulsar-zookeeper,pulsar-zookeeper-2.pulsar-zookeeper
kind: ConfigMap

@fracasula Interesting, I think 2.6.1 does not have any offloader drivers by default. If you are using offloader, you can try to use https://hub.docker.com/layers/streamnative/pulsar-all/2.6.1-sn-hotfix-2/images/sha256-2429d29d86e8eac252d055ed10541353f3f89d9569512048bebea0b35eb71d01?context=explore

WARN[0034] Received too big frame size. size=234969386

What version of go client are you using? This error is because the default message size allowed by pulsar is 5MB, if it exceeds this value, the above error will be thrown. In 0.2.0 of go client, we allow users to configure the max frame size. You can try to update the version of go client to 0.2.0 and configure the max message size in broker.conf to see if it can solve your problem.

WARN[0034] Received too big frame size. size=234969386

What version of go client are you using? This error is because the default message size allowed by pulsar is 5MB, if it exceeds this value, the above error will be thrown. In 0.2.0 of go client, we allow users to configure the max frame size. You can try to update the version of go client to 0.2.0 and configure the max message size in broker.conf to see if it can solve your problem.

@wolfstudy thanks we'll take that into consideration. Out of curiosity, how come the frame size is not checked when the message is being published? This way we would get an error when trying to write a message that is too big instead of when consuming it when it may be too late.

Out of curiosity, how come the frame size is not checked when the message is being published?

@fracasula Good questions.

So in https://github.com/apache/pulsar-client-go/pull/263, we check the maxMessageSize value and when the sent message is larger than maxMessageSize, we try to close the connection.

@fracasula Is the new image works for you? Any problems please ping me.

@fracasula Is the new image works for you? Any problems please ping me.

@codelipenghui yeah the last image is definitely working, thanks! I haven't got back to you because I'm off till next week and had troubles replicating the "silent broker" issue, it's quite random and hard to replicate. I think most of the times we had stuck consumers was due to client issues (Golang deadlock).

I think most of the times we had stuck consumers was due to client issues (Golang deadlock).

Hello @fracasula , can you provide more content for Golang deadlock?

I think most of the times we had stuck consumers was due to client issues (Golang deadlock).

Hello @fracasula , can you provide more content for Golang deadlock?

https://github.com/apache/pulsar-client-go/pull/366

Hi @fracasula

is this issue affecting the Go client only or other clients as well ?

Also, make sure that you are not affected by this issue https://github.com/apache/pulsar/pull/8024 which was introduced in 2.6.0

is this issue affecting the Go client only or other clients as well ?

Hi @alexku7,

if you run the test I posted and set up a few breakpoints you'll realize that this is a Golang library issue. I tried with Python multiple times and it never got deadlocked while acking.

Also, make sure that you are not affected by this issue #8024 which was introduced in 2.6.0

I'm running on the image provided by @codelipenghui. How do I make sure that I'm not affected by that issue you're mentioning? Also, how is that related to a client deadlock? Are you referring to the "silent broker" scenario I described above instead?

Hi @fracasula
Honestly, i didn't run the tests. Just would like to ensure that the our java client is not affected :)

Regarding to the https://github.com/apache/pulsar/pull/8024, i mentioned it because you are describing very similar symptoms that we have suffered from a lot. The same symptom of stuck consumer with Key_shared subscription that most likely is not resolved by a consumer restart. So sometimes you are trying to resolve some issue which actually is masked by other similar issue and this is very misleading .

Anyway, this https://github.com/apache/pulsar/pull/8024 was resolved only last Friday by 2.6.1 sn-3. So i think that most likely you are working with earlier images

Honestly, i didn't run the tests. Just would like to ensure that the our java client is not affected :)

Cool, please provide feedback here if possible so that I can start working on a fix :+1:

Anyway, this #8024 was resolved only last Friday by 2.6.1 sn-3. So i think that most likely you are working with earlier images

I'm running on this one: https://hub.docker.com/layers/streamnative/pulsar-all/2.6.1-sn-hotfix-2/images/sha256-2429d29d86e8eac252d055ed10541353f3f89d9569512048bebea0b35eb71d01?context=explore

Can you please provide the latest with the hotfix (additional logs for the NPE) to see if I can replicate the "silent broker" issue with it?

@fracasula 2.6.1-sn-3 is based on the 2.6.1-sn-hotfix-2, and 2.6.1-sn-3 has fixed another issue #8024, so if you met the issue that #8024 fixed, you can upgrade to 2.6.1-sn-3

Hi guys, about the Golang deadlock, I'm able to replicate it with the latest image too (i.e. streamnative/pulsar-all:2.6.1-sn-3). It still shows that the client is stucks acking but it may not just be a client issue like suggested by @alexku7. By further developing my test I was able to overlap the logs of client and server and I can say that I consistenly get this on the server side each time the client gets stuck:

time="2020-09-17T09:15:46+02:00" level=info msg="Error reading from connection" error="Short read when reading frame size: read tcp [::1]:48104->[::1]:32771: use of closed network connection" local_addr="[::1]:48104" remote_addr="pulsar://localhost:32771"
07:15:46.482 [pulsar-io-51-16] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:38668
07:15:46.482 [pulsar-io-51-16] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-756c1ab9019bbcdc}, consumerId=1, consumerName=consumer-name-249de6d4895d0e15, address=/172.17.0.1:38668} with pending 200 acks
07:15:46.486 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-756c1ab9019bbcdc] Rewind from 13:700 to 13:400
    consumer_partition_integration_test.go:112: [2020-09-17 09:15:46.62477658 +0200 CEST m=+181.898034767] Trying to ack 13:426:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446798277 +0200 CEST m=+171.004720824] Trying to ack 13:1318:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44681232 +0200 CEST m=+171.004734868] Trying to ack 13:1332:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44682636 +0200 CEST m=+171.004748901] Trying to ack 13:1334:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446840804 +0200 CEST m=+171.004763351] Trying to ack 13:1337:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446756504 +0200 CEST m=+171.004679062] Trying to ack 13:1324:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446845301 +0200 CEST m=+171.004767844] Trying to ack 13:1338:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446863899 +0200 CEST m=+171.004786443] Trying to ack 13:1340:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446876519 +0200 CEST m=+171.004799061] Trying to ack 13:1341:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446783149 +0200 CEST m=+171.004705702] Trying to ack 13:1328:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446772341 +0200 CEST m=+171.004694903] Trying to ack 13:1322:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446789742 +0200 CEST m=+171.004712290] Trying to ack 13:1321:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446902955 +0200 CEST m=+171.004825503] Trying to ack 13:1345:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44692216 +0200 CEST m=+171.004844705] Trying to ack 13:1347:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446938844 +0200 CEST m=+171.004861396] Trying to ack 13:1348:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446957473 +0200 CEST m=+171.004880021] Trying to ack 13:1351:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446980957 +0200 CEST m=+171.004903502] Trying to ack 13:1349:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446985213 +0200 CEST m=+171.004907763] Trying to ack 13:1353:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446798504 +0200 CEST m=+171.004721046] Trying to ack 13:1323:0
    consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446791901 +0200 CEST m=+171.004714458] Trying to ack 13:1326:0
...

Do you think that's something that could cause the consumer to hang forever waiting for those pending acks to be processed?

I ran it multiple times, here's the output the second time:

08:33:20.866 [pulsar-io-51-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40022] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-2da7ed37505244dc
08:33:20.960 [pulsar-io-51-7] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40022
08:33:20.960 [pulsar-io-51-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-2da7ed37505244dc}, consumerId=1, consumerName=consumer-name-748c24a16b99354a, address=/172.17.0.1:40022} with pending 148 ack

Third:

08:34:45.073 [pulsar-io-51-8] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:40030
08:34:45.074 [pulsar-io-51-8] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40030] Subscribing on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.074 [pulsar-io-51-8] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Cursor subscription-name-3afa9641ff625519 recovered to position 41:99999
08:34:45.074 [pulsar-io-51-8] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[112, 117, 98, 108, 105, 99, 47, 100, 101, 102, 97, 117, 108, 116, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 116, 111, 112, 105, 99, 45, 110, 97, 109, 101, 45, 54, 49, 98, 57, 54, 57, 55, 98, 54, 102, 48, 56, 55, 97, 99, 97], pulsar/cursor=[115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 45, 110, 97, 109, 101, 45, 51, 97, 102, 97, 57, 54, 52, 49, 102, 102, 54, 50, 53, 53, 49, 57], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:34:45.079 [main-EventThread] INFO  org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 45
08:34:45.081 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/topic-name-61b9697b6f087aca] [subscription-name-3afa9641ff625519] Updating cursor info ledgerId=45 mark-delete=41:99999
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Updated cursor subscription-name-3afa9641ff625519 with ledger id 45 md-position=41:99999 rd-position=41:100000
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Opened new cursor: ManagedCursorImpl{ledger=public/default/persistent/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519, ackPos=41:99999, readPos=41:100000}
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:0 to 41:0
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca] There are no replicated subscriptions on the topic
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca][subscription-name-3afa9641ff625519] Created new subscription for 1
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40030] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.086 [pulsar-io-51-8] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40030
08:34:45.086 [pulsar-io-51-8] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519}, consumerId=1, consumerName=consumer-name-3f0be6cd39d4812b, address=/172.17.0.1:40030} with pending 0 acks
08:34:45.087 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:1 to 41:0
08:34:45.187 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:40034
08:34:45.190 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40034] Subscribing on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.190 [pulsar-io-51-9] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:0 to 41:0
08:34:45.190 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca] There are no replicated subscriptions on the topic
08:34:45.191 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca][subscription-name-3afa9641ff625519] Created new subscription for 1
08:34:45.191 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40034] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.286 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40034
08:34:45.286 [pulsar-io-51-9] INFO  org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519}, consumerId=1, consumerName=consumer-name-3f0be6cd39d4812b, address=/172.17.0.1:40034} with pending 64 acks
08:34:45.286 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:3000 to 41:2836
08:34:50.409 [pulsar-web-69-13] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Sep/2020:08:34:50 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false HTTP/1.1" 200 1283 "-" "Pulsar-Java-v2.6.1-sn-3" 7

The https://github.com/apache/pulsar-client-go/pull/376 will fix the issue of consumer blocking in go client.

Closed the issue since the two issues have been fixed. If there are any new issues, let's use a new issue to track it.

Was this page helpful?
0 / 5 - 0 ratings