Describe the bug
A consumer gets stuck after seeing an error mostly probably coming from the cgo layer underneath:
terminate called after throwing an instance of 'std::bad_alloc'
what(): std::bad_alloc
To Reproduce
Steps to reproduce the behavior:
bin/pulsar-admin topics reset-cursor persistent://public/default/SpaceEvents -s cloud-notifications-service -t 999wExpected behavior
I would expect it to not block and to not raise any bad alloc error.
Screenshots
No screenshots available.
Desktop (please complete the following information):
Additional context
I cross referenced the logs of our consumer to see what happens on the Pulsar side when we get the bad alloc errors and we were able to find some interesting exceptions that seem to happen concomitantly with the bad alloc errors (see attached report).
Some errors are particularly interesting and make me think that we might have issues when reading entries from a ledger (bookkeeper). Is there anything you can suggest on how to better debug this? Thanks!
Just tried to consume from the same topic with the new Golang library (I noticed that you guys deprecated the cgo one). With the pure Golang one instead of getting stuck with the bad::alloc it fails and returns this:
WARN[0034] Received too big frame size. size=234969386 local_addr="[::1]:44310" remote_addr="pulsar://localhost:6650"
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x68 pc=0xadee3d]
goroutine 409 [running]:
github.com/apache/pulsar-client-go/pulsar/internal.(*MessageReader).readChecksum(0xc00067d8f8, 0x0, 0xc00067d6d8, 0x4160bd)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/commands.go:78 +0x2d
github.com/apache/pulsar-client-go/pulsar/internal.(*MessageReader).ReadMessageMetadata(0xc00067d8f8, 0xc00067d910, 0x0, 0x0)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/commands.go:95 +0x40
github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).MessageReceived(0xc00010a4e0, 0xc0003e2f40, 0x0, 0x0, 0x1, 0x0)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/consumer_partition.go:370 +0xa8
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleMessage(0xc0002e9560, 0xc0003e2f40, 0x0, 0x0)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:543 +0x142
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc0002e9560, 0xc0000188c0, 0x0, 0x0)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:457 +0x24d
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc0002e9560)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:340 +0x304
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc0002e9560)
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:204 +0x59
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
/home/francesco/go/pkg/mod/github.com/apache/[email protected]/pulsar/internal/connection.go:201 +0x3f
make: *** [Makefile:46: run] Error 2
According to the log coming from your library right before the panic it says that it received a frame that is too big? Any ideas?
I realized that 2.6.0 is out, we're running on 2.5.1. We'll upgrade and come back to you, sorry for the inconvenience.
We upgraded this morning to 2.6.0 in production with the latest Pulsar Golang client library (i.e. no cgo). Of course we tested our application on both testing and staging before going live and we didn't find any compatibility issues. Unfortunately as we deployed the new version the scenario described above happened in a much larger scale affecting several consumers at the same time.
The backlog started to grow quite a bit so we decided to downgrade back to 2.5.1. That helped a bit but some consumers were definitely still stuck. Upgraded again to 2.6.0, this time with the autoSkipNonRecoverableData set to true. With these settings and jobs that periodically restart our pods in Kubernetes we managed to get it more stable.
Of course we would like to avoid having to kill our pods on a regular basis so it would be nice to find a solution.
When looking at the logs on the client side there's nothing relevant coming from the Pulsar library, it just reports logs like:
{"level":"debug","name":"cloud-nodes-service-3681597d6aa5773d","severity":"DEBUG","subscription":"cloud-nodes-service","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}
{"cnx":"10.1.15.135:40246 -\u003e 10.2.7.212:6650","level":"debug","severity":"DEBUG","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}
{"cnx":"10.1.15.135:40246 -\u003e 10.2.7.212:6650","level":"info","producer_name":"cloud-nodes-service-3681597d6aa5773d","severity":"INFO","time":"2020-08-11T07:34:27Z","topic":"persistent://public/default/AlarmEventsUpdated"}
On the server side (mostly the Pulsar broker) we got plenty of exceptions though, such as:
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/NodeAlarmCountersUpdated] Error opening ledger for reading at position 297169:27536 - org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] WARN org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/NodeAlarmCountersUpdated][cloud-nodes-service] read failed from ledger at position:297169:27536 : Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:35:58.848 [bookkeeper-ml-workers-OrderedExecutor-1-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeAlarmCountersUpdated / cloud-nodes-service] Error reading entries at 297169:27536 : Unknown exception, Read Type Normal - Retrying to read in 54.321 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.406 [pulsar-io-23-4] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeInfoUpdated / cloud-spaceroom-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.428 [offloader-OrderedScheduler-1-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker java.lang.NullPointerException: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:319) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:287) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$ParameterReader.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:275) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$DeserializeIntoParameterizedConstructor.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:187) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.gson.Gson.fromJson(Gson.java:888) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.gson.Gson.fromJson(Gson.java:853) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.GsonWrapper.fromJson(GsonWrapper.java:55) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:82) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:76) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:61) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:41) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.invoke(InvokeHttpMethod.java:91) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:74) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:45) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.reflect.FunctionalReflection$FunctionalInvocationHandler.handleInvocation(FunctionalReflection.java:117) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:86) ~[com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.sun.proxy.$Proxy122.getObject(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.googlecloudstorage.blobstore.GoogleCloudStorageBlobStore.getBlob(GoogleCloudStorageBlobStore.java:246) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.blobstore.internal.BaseBlobStore.getBlob(BaseBlobStore.java:217) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:37) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.sun.proxy.$Proxy73.getBlob(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.open(BlobStoreBackedReadHandleImpl.java:195) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$readOffloaded$5(BlobStoreManagedLedgerOffloader.java:556) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:00.444 [offloader-OrderedScheduler-1-0] ERROR org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader - Failed readOffloaded:
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker org.jclouds.http.HttpResponseException: Error parsing input: null
or
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.482 [offloader-OrderedScheduler-0-0] ERROR org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader - Failed readOffloaded:
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker org.jclouds.http.HttpResponseException: Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker {statusCode=200, message=OK, headers={Server=[UploadServer], ETag=[CNqKo4De+OoCEAE=], X-GUploader-UploadID=[AAANsUn4S2D94evffqRwEGbgp4-4afCAlL11_kSwnURQy8ukjkf4B9-9HFkMFNoBy62PYgKa7ZRFwa-aJB73GpNWRYl4g6Uk8w], Vary=[X-Origin, Origin], Date=[Tue, 11 Aug 2020 07:36:04 GMT]}, payload=[content=true, contentMetadata=[cacheControl=private, max-age=0, must-revalidate, no-transform, contentDisposition=null, contentEncoding=null, contentLanguage=null, contentLength=1234, contentMD5=null, contentType=application/json; charset=UTF-8, expires=Tue Aug 11 07:36:04 UTC 2020], written=false, isSensitive=false]}
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:67) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:41) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.invoke(InvokeHttpMethod.java:91) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:74) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.rest.internal.InvokeHttpMethod.apply(InvokeHttpMethod.java:45) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.reflect.FunctionalReflection$FunctionalInvocationHandler.handleInvocation(FunctionalReflection.java:117) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:86) ~[com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.sun.proxy.$Proxy122.getObject(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.googlecloudstorage.blobstore.GoogleCloudStorageBlobStore.getBlob(GoogleCloudStorageBlobStore.java:246) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.blobstore.internal.BaseBlobStore.getBlob(BaseBlobStore.java:217) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.inject.internal.DelegatingInvocationHandler.invoke(DelegatingInvocationHandler.java:37) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.sun.proxy.$Proxy73.getBlob(Unknown Source) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.open(BlobStoreBackedReadHandleImpl.java:195) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$readOffloaded$5(BlobStoreManagedLedgerOffloader.java:556) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) [com.google.guava-guava-25.1-jre.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker Caused by: java.lang.NullPointerException
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:319) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.NullFilteringTypeAdapterFactories$MapTypeAdapter.read(NullFilteringTypeAdapterFactories.java:287) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$ParameterReader.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:275) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.DeserializationConstructorAndReflectiveTypeAdapterFactory$DeserializeIntoParameterizedConstructor.read(DeserializationConstructorAndReflectiveTypeAdapterFactory.java:187) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.gson.Gson.fromJson(Gson.java:888) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at com.google.gson.Gson.fromJson(Gson.java:853) ~[com.google.code.gson-gson-2.8.2.jar:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.json.internal.GsonWrapper.fromJson(GsonWrapper.java:55) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:82) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:76) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker at org.jclouds.http.functions.ParseJson.apply(ParseJson.java:61) ~[?:?]
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker ... 28 more
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/AlarmEventsUpdated] Error opening ledger for reading at position 297170:27534 - org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] WARN org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/AlarmEventsUpdated][cloud-nodes-service] read failed from ledger at position:297170:27534 : Unknown exception
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.495 [bookkeeper-ml-workers-OrderedExecutor-5-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/AlarmEventsUpdated / cloud-nodes-service] Error reading entries at 297170:27534 : Unknown exception, Read Type Normal - Retrying to read in 59.128 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.935 [pulsar-io-23-3] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/NodeInfoUpdated / cloud-spaceroom-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:04.954 [offloader-OrderedScheduler-1-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.742 [pulsar-io-23-1] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/AlarmEventsUpdated / cloud-nodes-service] Retrying read operation
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.746 [pulsar-load-manager-3-1] INFO org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Writing local data to ZooKeeper because maximum change 10.64431369304657% exceeded threshold 10%; time since last report written is 10.0 seconds
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.751 [pulsar-ordered-OrderedExecutor-5-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x101c63ce3fa0003 local:/10.1.29.4:52068 remoteserver:pulsar-zookeeper-1.pulsar-zookeeper/10.1.34.231:2181 lastZxid:47245067907 xid:29534 sent:29536 recv:30476 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/loadbalance/brokers/10.1.29.4:8080
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker 07:36:07.761 [offloader-OrderedScheduler-0-0] ERROR org.jclouds.http.functions.ParseJson - Error parsing input: null
pulsar-broker-84bf4f59d6-lgzzp pulsar-broker java.lang.NullPointerException: null
Here are the full logs.
Any ideas?
/cc @gaoran10 looks the problem the same as you have met. Could you please also help take a look?
@fracasula Hi, the jclouds NPE problem is similar to the issue #7402. Currently, the Pulsar 2.6.0 couldn't read offloaded data from GCS or offload data to GCS well. Maybe you offload data by Pulsar 2.5.0 successfully and failed to read the offloaded data by Pulsar 2.6.0. This issue is already fixed by PR #7435 and Pulsar 2.6.1 will cover this PR.
@codelipenghui @gaoran10 We created a clone of the whole production environment with the offloaded buckets too and, even with 2.6.1, we're perfectly able to replicate. In fact the consumers are still getting stuck. We're able to unlock them only by manually killing the pod on Kubernetes.
I deployed 3 consumers with a key shared subscription listening to our SpaceEvents topic. Once all three are running I reset the consumers offset for that subscription to by doing bin/pulsar-admin topics reset-cursor SpaceEvents -s cloud-pulsar-tester -t 99999w. After the reset I can see that the 3 consumers started reading messages. The msgRateOut showed in the bin/pulsar-admin topics stats SpaceEvents also shows that all 3 are reading messages.
At some point, when there are around 181k messages left in the backlog, the consumers get stuck.
Golang µservices logs report warnings only, coming from the Pulsar library itself (which uses logrus), they are not coming from our code:
{ remote_addr: { ForceQuery: false Host: "pulsar-proxy.pulsar:6650" RawPath: "" User: null Fragment: "" Opaque: "" RawQuery: "" Path: "" Scheme: "pulsar" RawFragment: "" } level: "warn" local_addr: { Port: 44814 IP: "10.56.0.18" Zone: "" } }
{ remote_addr: { RawPath: "" Scheme: "pulsar" Path: "" Opaque: "" RawQuery: "" Host: "pulsar-proxy.pulsar:6650" RawFragment: "" Fragment: "" User: null ForceQuery: false } level: "warn" error: "write tcp 10.56.0.18:44814->10.0.1.68:6650: use of closed network connection" local_addr: { Port: 44814 Zone: "" IP: "10.56.0.18" } }
{ remote_addr: { RawQuery: "" Opaque: "" ForceQuery: false RawFragment: "" Scheme: "pulsar" Fragment: "" RawPath: "" Path: "" Host: "pulsar-proxy.pulsar:6650" User: null } local_addr: { Zone: "" Port: 42612 IP: "10.56.0.21" } level: "warn" }
{ level: "warn" remote_addr: { User: null Path: "" Scheme: "pulsar" RawQuery: "" Fragment: "" Opaque: "" Host: "pulsar-proxy.pulsar:6650" ForceQuery: false RawPath: "" RawFragment: "" } local_addr: { Zone: "" IP: "10.56.0.21" Port: 42612 } error: "write tcp 10.56.0.21:42612->10.0.1.68:6650: use of closed network connection" }
{ local_addr: { Zone: "" Port: 51340 IP: "10.56.0.17" } remote_addr: { User: null ForceQuery: false RawPath: "" RawFragment: "" RawQuery: "" Path: "" Host: "pulsar-proxy.pulsar:6650" Opaque: "" Fragment: "" Scheme: "pulsar" } level: "warn" }
Pulsar logs report the following (by grepping by exception).
java.lang.NullPointerException: null
org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
08:25:35.597 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/SpaceEvents / cloud-pulsar-tester] Error reading entries at 1430:0 : Unknown exception, Read Type Replay - Retrying to read in 15.0 seconds
java.lang.NullPointerException: null
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1776) ~[org.apache.pulsar-pulsar-common-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:87) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
java.lang.NullPointerException: null
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:125) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
java.lang.NullPointerException: null
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
08:27:09.703 [bookkeeper-ml-workers-OrderedExecutor-3-0] WARN org.apache.bookkeeper.mledger.impl.OpReadEntry - [public/default/persistent/SpaceEvents][cloud-pulsar-tester] read failed from ledger at position:2193:0 : Unknown exception
08:27:09.703 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - [persistent://public/default/SpaceEvents / cloud-pulsar-tester] Error reading entries at 2193:0 : Unknown exception, Read Type Normal - Retrying to read in 15.0 seconds
java.lang.NullPointerException: null
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1776) ~[org.apache.pulsar-pulsar-common-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:87) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
java.lang.NullPointerException: null
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:125) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:192) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:480) ~[org.apache.pulsar-pulsar-broker-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
java.lang.NullPointerException: null
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$readEntriesFailed$0(OpReadEntry.java:94) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[org.apache.pulsar-managed-ledger-2.6.1.jar:2.6.1]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
08:27:14.083 [pulsar-proxy-io-2-3] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
08:27:14.090 [pulsar-proxy-io-2-3] WARN org.apache.pulsar.proxy.server.ProxyConnection - [/10.56.0.21:42612] Got exception NativeIoException : readAddress(..) failed: Connection reset by peer null
08:27:38.090 [pulsar-proxy-io-2-8] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
08:27:38.090 [pulsar-proxy-io-2-8] WARN org.apache.pulsar.proxy.server.ProxyConnection - [/10.56.0.17:51340] Got exception NativeIoException : readAddress(..) failed: Connection reset by peer null
When inspecting the subscription, via the pulsar-admin cli tool, we see no consumers at all despite having 3 services running (and using your Pulsar Golang library - the native one, no cgo):
$ bin/pulsar-admin topics stats SpaceEvents | jq '.subscriptions["cloud-pulsar-tester"]' -c | jq
{
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"msgRateRedeliver": 0,
"chuckedMessageRate": 0,
"msgBacklog": 181626,
"msgBacklogNoDelayed": 181626,
"blockedSubscriptionOnUnackedMsgs": false,
"msgDelayed": 0,
"unackedMessages": 0,
"type": "Key_Shared",
"msgRateExpired": 0,
"lastExpireTimestamp": 0,
"lastConsumedFlowTimestamp": 1598516857278,
"lastConsumedTimestamp": 0,
"lastAckedTimestamp": 0,
"consumers": [],
"isDurable": true,
"isReplicated": false
}
If we inspect the Golang µservices logs with a lower debug level we see logs coming from the underlying Pulsar Golang library as shown below (please note that the following logs are regularly printed also after the consumers get stuck):
{"level":"debug","severity":"DEBUG","time":"2020-08-27T09:14:12Z","topic":"SpaceEvents"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
{"level":"debug","local_addr":{"IP":"10.56.0.17","Port":51336,"Zone":""},"remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"pulsar-proxy.pulsar:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":"","RawFragment":""},"severity":"DEBUG","time":"2020-08-27T09:14:22Z"}
We're getting also these from your Pulsar lib:
{ msgID: { partition: -1 ledgerId: 389 entryId: 6196 } name: "cloud-pulsar-tester-se-c2-f5lgn" topic: "persistent://public/default/SpaceEvents" subscription: "cloud-pulsar-tester" validationError: 2 consumerID: 1 level: "error" }
{ local_addr: { Zone: "" IP: "10.56.2.26" Port: 46050 } error: "corrupted message" consumerID: 1 remote_addr: { Path: "" Fragment: "" RawFragment: "" Opaque: "" ForceQuery: false RawQuery: "" Scheme: "pulsar" Host: "pulsar-proxy.pulsar:6650" User: null RawPath: "" } level: "error" }
Those came out with v0.2.0 too which was released ~13 hours ago.
Sorry, I was somehow dropping the message attribute from the logs coming from your Golang library (i.e. when converting them to the Stackdriver format). These should hopefully make more sense:
{ message: "Received too big frame size. size=1869573421" level: "warn" local_addr: { IP: "10.56.0.34" Port: 60946 Zone: "" } remote_addr: { Opaque: "" User: null RawFragment: "" RawPath: "" Host: "pulsar-proxy.pulsar:6650" Path: "" Scheme: "pulsar" RawQuery: "" Fragment: "" ForceQuery: false } }
{ message: "Failed to write on connection" error: "write tcp 10.56.0.34:60946->10.0.1.68:6650: use of closed network connection" level: "warn" local_addr: { Zone: "" Port: 60946 IP: "10.56.0.34" } remote_addr: { Path: "" Host: "pulsar-proxy.pulsar:6650" RawFragment: "" Opaque: "" Scheme: "pulsar" User: null RawQuery: "" ForceQuery: false RawPath: "" Fragment: "" } }
After further investigation I was able to come across two scenarios, which may be related.
stats tool reports several messages in the msgBacklog for the given topic/subscription and it shows all the consumers as connected but with a msgRateOut of 0flow stage and then hang forever waiting for a MESSAGE command that never comes, meaning they connect to the broker, send the 1000 permits and then just get pings from here, no messages whatsoever2020-08-31 08:37:07.728 INFO [139696078108480] Client:88 | Subscribing on Topic :SpaceEvents
2020-08-31 08:37:07.728 INFO [139696078108480] ConnectionPool:85 | Created connection for pulsar://localhost:6650
2020-08-31 08:37:07.729 INFO [139696048822016] ClientConnection:343 | [[::1]:57112 -> [::1]:6650] Connected to broker
2020-08-31 08:37:08.286 INFO [139696048822016] HandlerBase:53 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Getting connection from pool
2020-08-31 08:37:08.426 INFO [139696048822016] ConnectionPool:85 | Created connection for pulsar://10.56.3.23:6650
2020-08-31 08:37:08.427 INFO [139696048822016] ClientConnection:345 | [[::1]:57114 -> [::1]:6650] Connected to broker through proxy. Logical broker: pulsar://10.56.3.23:6650
2020-08-31 08:37:08.968 WARN [139696048822016] ClientConnection:947 | [[::1]:57114 -> [::1]:6650] Received error response from server: UnknownError -- req_id: 0
2020-08-31 08:37:08.968 ERROR [139696048822016] ConsumerImpl:242 | [persistent://public/default/SpaceEvents, cloud-pulsar-tester, 0] Failed to create consumer: UnknownError
Traceback (most recent call last):
File "main.py", line 4, in <module>
consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
File "/home/francesco/.local/lib/python3.8/site-packages/pulsar/__init__.py", line 655, in subscribe
c._consumer = self._client.subscribe(topic, subscription_name, conf)
Exception: Pulsar error: UnknownError
2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:1387 | [[::1]:57114 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:1387 | [[::1]:57112 -> [::1]:6650] Connection closed
2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:238 | [[::1]:57114 -> [::1]:6650] Destroyed connection
2020-08-31 08:37:08.974 INFO [139696078108480] ClientConnection:238 | [[::1]:57112 -> [::1]:6650] Destroyed connection
This is the Python code I used:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('SpaceEvents', 'cloud-pulsar-tester')
def consume():
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except:
# Message failed to be processed
consumer.negative_acknowledge(msg)
if __name__ == '__main__':
consume()
client.close()
And the requirements.txt:
pulsar-client==2.6.1
apache-bookkeeper-client==4.11.0
grpcio<1.26.0
trace)stats tool reports no consumers at all despite all clients are print the PING/PONG successfully in their logsstats tool reports a msgBacklog greater than zero so there are messages waiting to be processedTrying to ack on the logs whenever it was taking more than 150msThe last bullet point means that I change this code:
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
for {
select {
case <-pc.closeCh:
return
case i := <-pc.eventsCh:
switch v := i.(type) {
case *ackRequest:
pc.internalAck(v)
Like this:
func (pc *partitionConsumer) runEventsLoop() {
defer func() {
pc.log.Debug("exiting events loop")
}()
for {
select {
case <-pc.closeCh:
return
case i := <-pc.eventsCh:
switch v := i.(type) {
case *ackRequest:
ctx, cancel := context.WithCancel(context.Background())
go func(v *ackRequest) {
for {
select {
case <-ctx.Done():
return
default:
pc.log.Infof("Trying to ack %+v (%d - %d)", v.msgID, len(pc.eventsCh), cap(pc.eventsCh))
time.Sleep(150 * time.Millisecond)
}
}
}(v)
pc.internalAck(v)
cancel()
If the ack happens within 150ms we don't see any logs. The problem is that now I have all consumers stuck in a never ending loop just printing:
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:11+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T18:52:12+02:00","topic":"persistent://public/default/SpaceEvents"}
This has been going on for several hours and the message ID of the log entry is always the same until I kill the consumer.
Could it be that, given that the eventsCh is a buffered channel of 3, when a connection gets closed due to a message frame size that is too big then the runEventsLoop() never gets to process the *connectionClosed event due to at least 3 in-flight ack requests?
Meaning: we could have 3 ack requests that are already keeping the channel full, the connection gets closed, the acks can't be processed because the connection was closed and we cannot reconnect to the broker because, due to the channel being full, we can't push *connectionClosed event into the eventsCh thus it never gets processed = deadlock?
In support of this theory I can see this in the logs right before I get the never ending Trying to ack loop:
{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"level":"debug","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Write data: 25","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"DEBUG","time":"2020-08-31T17:47:23+02:00"}
{"error":"write tcp [::1]:35530-\u003e[::1]:6650: use of closed network connection","level":"warn","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Failed to write on connection","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"WARNING","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"level":"info","local_addr":{"IP":"::1","Port":35530,"Zone":""},"message":"Connection closed","remote_addr":{"Scheme":"pulsar","Opaque":"","User":null,"Host":"localhost:6650","Path":"","RawPath":"","ForceQuery":false,"RawQuery":"","Fragment":""},"severity":"INFO","time":"2020-08-31T17:47:23+02:00"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:23+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
{"consumerID":1,"level":"info","message":"Trying to ack {messageID:{ledgerID:122209 entryID:146 batchIdx:0 partitionIdx:0} tracker:\u003cnil\u003e consumer:0xc000336000 receivedTime:{wall:13820250669831506541 ext:18161550576 loc:0x14dc660}} (3 - 3)","name":"nebuchadnezzar","severity":"INFO","subscription":"cloud-pulsar-tester","time":"2020-08-31T17:47:24+02:00","topic":"persistent://public/default/SpaceEvents"}
Also by analyzing the stack dump I can see that one of the 21 goroutines running is stuck waiting here.
1 @ 0x449d3b 0x41229f 0x412095 0xc5b3f5 0xc46394 0xc4ceb9 0xc41f5e 0xc4ccc6 0x47d571
# 0xc5b3f4 github.com/apache/pulsar-client-go/pulsar.(*partitionConsumer).ConnectionClosed+0x64 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/consumer_partition.go:567
# 0xc46393 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close+0x583 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:751
# 0xc4ceb8 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run.func1+0x168 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:363
# 0xc41f5d github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run+0x2bd /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:369
# 0xc4ccc5 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1+0x85 /home/francesco/Code/netdata/cloud-pulsar-tester/pulsar-client-go/pulsar/internal/connection.go:231
Killing the consumer in this case helps but it eventually gets stuck again trying to ack some other message.
We could potentially try to look into the second scenario but have no clue whatsoever about the first one and we have no Java expertise. Can someone please look a bit more into this and tell us whether you need more information? Thanks.
@fracasula Looks the broker error log is the same as https://github.com/streamnative/pulsar/issues/1366 that I have met before when I working on some performance tests.
also @BewareMyPower would you please help to take a look on the cpp error stack.
@jiazhai Looks like the cpp client's error is just caused by the broker.
// ClientConnection.cc
case BaseCommand::ERROR: {
const CommandError& error = incomingCmd_.error();
Result result = getResult(error.error());
LOG_WARN(cnxString_ << "Received error response from server: " << result
<< " -- req_id: " << error.request_id());
The broker gave an unknown ServerError or just an UnknownError when processing the subscribe command.
@wolfstudy to take a look at the go-client issue. it looks like a command not handled well in the protocol.
@fracasula I have pushed a new docker image to the docker hub https://hub.docker.com/layers/streamnative/pulsar/2.6.1-sn-hotfix-1/images/sha256-6ad564327342355d57fa12ba5a2f35ca84957c9949313524095a58623cfe0834?context=explore
This image is based on 2.6.1 and contains https://github.com/streamnative/pulsar/pull/1436. You can upload the broker log file if you reproduce the problem.
@codelipenghui thanks! I'll give it a try :+1:
@codelipenghui I replaced apachepulsar/pulsar:2.6.1 with streamnative/pulsar:2.6.1-sn-hotfix-1 in my Kubernetes configuration but now I have the pods failing on startup with this error:
/usr/bin/env: ‘python’: No such file or directory
Here's an excerpt of my configuration from the Pulsar broker deployment:
containers:
- args:
- |
bin/apply-config-from-env.py conf/broker.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && bin/apply-config-from-env.py conf/client.conf && bin/gen-yml-from-env.py conf/functions_worker.yml && exec bin/pulsar broker
command:
- sh
- -c
env:
- name: advertisedAddress
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
envFrom:
- configMapRef:
name: pulsar-broker
image: streamnative/pulsar:2.6.1-sn-hotfix-1
imagePullPolicy: IfNotPresent
@fracasula Oh, I see. I will build a new image for you.
@fracasula You can try this one https://hub.docker.com/layers/streamnative/pulsar/2.6.1-sn-hotfix-2/images/sha256-765a82e1601e64cdb8ae8f8d7bd38a4ba936cf5f16557c7f7bb36904813bf0ff?context=explore
@codelipenghui I got this coming from the broker:
11:35:08.153 [main] ERROR org.apache.pulsar.PulsarBrokerStarter - Failed to start pulsar service.
org.apache.pulsar.broker.PulsarServerException: org.apache.pulsar.broker.PulsarServerException: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:587) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
at org.apache.pulsar.PulsarBrokerStarter$BrokerStarter.start(PulsarBrokerStarter.java:280) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
at org.apache.pulsar.PulsarBrokerStarter.main(PulsarBrokerStarter.java:349) [org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
Caused by: org.apache.pulsar.broker.PulsarServerException: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:912) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:458) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
... 2 more
Caused by: java.io.IOException: No offloader found for driver 'google-cloud-storage'. Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`.
at org.apache.bookkeeper.mledger.offload.Offloaders.getOffloaderFactory(Offloaders.java:42) ~[org.apache.pulsar-managed-ledger-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:893) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
at org.apache.pulsar.broker.PulsarService.start(PulsarService.java:458) ~[org.apache.pulsar-pulsar-broker-2.6.1-sn-hotfix-2.jar:2.6.1-sn-hotfix-2]
... 2 more
Bookkeeper, zookeeper and proxy seem fine instead.
@fracasula interesting... did you specify the offloader such as managedLedgerOffloadDriver in the broker.conf? The default managedLedgerOffloadDriver is null, so the broker will not find the google-cloud-storage driver.
@codelipenghui yeah I think I did, in fact it works with the usual apachepulsar/pulsar:2.6.1 image. Here's an excerpt from the deployment:
containers:
- args:
- |
bin/apply-config-from-env.py conf/broker.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && bin/apply-config-from-env.py conf/client.conf && bin/gen-yml-from-env.py conf/functions_worker.yml && exec bin/pulsar broker
command:
- sh
- -c
envFrom:
- configMapRef:
name: pulsar-broker
As you can see it applies the configuration coming from the environment variables which are defined in the below configmap:
apiVersion: v1
data:
PULSAR_EXTRA_OPTS: -Dpulsar.log.root.level=trace
PULSAR_GC: '"-XX:+UseG1GC -XX:MaxGCPauseMillis=10"'
PULSAR_LOG_LEVEL: trace
PULSAR_LOG_ROOT_LEVEL: trace
PULSAR_MEM: '"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=2g -Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024 -XX:+ParallelRefProcEnabled -XX:+UnlockExperimentalVMOptions
-XX:+AggressiveOpts -XX:+DoEscapeAnalysis -XX:ParallelGCThreads=32 -XX:ConcGCThreads=32
-XX:G1NewSizePercent=50 -XX:+DisableExplicitGC -XX:-ResizePLAB -XX:+ExitOnOutOfMemoryError
-XX:+PerfDisableSharedMem"'
allowAutoTopicCreationType: non-partitioned
autoSkipNonRecoverableData: "true"
backlogQuotaDefaultRetentionPolicy: producer_exception
clusterName: pulsar
configurationStoreServers: pulsar-zookeeper-0.pulsar-zookeeper,pulsar-zookeeper-1.pulsar-zookeeper,pulsar-zookeeper-2.pulsar-zookeeper
deduplicationEnabled: "true"
defaultRetentionSizeInMB: "-1"
defaultRetentionTimeInMinutes: "-1"
exposeConsumerLevelMetricsInPrometheus: "true"
exposeTopicLevelMetricsInPrometheus: "true"
gcsManagedLedgerOffloadBucket: netdata-pulsar-debugging
gcsManagedLedgerOffloadRegion: ""
gcsManagedLedgerOffloadServiceAccountKeyFile: /pulsar/gcp-service-account/key
managedLedgerDefaultAckQuorum: "2"
managedLedgerDefaultEnsembleSize: "2"
managedLedgerDefaultWriteQuorum: "2"
managedLedgerOffloadAutoTriggerSizeThresholdBytes: "32000000"
managedLedgerOffloadDeletionLagMs: "7200000"
managedLedgerOffloadDriver: google-cloud-storage
maxConsumersPerSubscription: "500"
maxConsumersPerTopic: "500"
maxProducersPerTopic: "100"
zookeeperServers: pulsar-zookeeper-0.pulsar-zookeeper,pulsar-zookeeper-1.pulsar-zookeeper,pulsar-zookeeper-2.pulsar-zookeeper
kind: ConfigMap
@fracasula Interesting, I think 2.6.1 does not have any offloader drivers by default. If you are using offloader, you can try to use https://hub.docker.com/layers/streamnative/pulsar-all/2.6.1-sn-hotfix-2/images/sha256-2429d29d86e8eac252d055ed10541353f3f89d9569512048bebea0b35eb71d01?context=explore
WARN[0034] Received too big frame size. size=234969386
What version of go client are you using? This error is because the default message size allowed by pulsar is 5MB, if it exceeds this value, the above error will be thrown. In 0.2.0 of go client, we allow users to configure the max frame size. You can try to update the version of go client to 0.2.0 and configure the max message size in broker.conf to see if it can solve your problem.
WARN[0034] Received too big frame size. size=234969386
What version of go client are you using? This error is because the default message size allowed by pulsar is 5MB, if it exceeds this value, the above error will be thrown. In 0.2.0 of go client, we allow users to configure the max frame size. You can try to update the version of go client to 0.2.0 and configure the max message size in
broker.confto see if it can solve your problem.
@wolfstudy thanks we'll take that into consideration. Out of curiosity, how come the frame size is not checked when the message is being published? This way we would get an error when trying to write a message that is too big instead of when consuming it when it may be too late.
Out of curiosity, how come the frame size is not checked when the message is being published?
@fracasula Good questions.
So in https://github.com/apache/pulsar-client-go/pull/263, we check the maxMessageSize value and when the sent message is larger than maxMessageSize, we try to close the connection.
@fracasula Is the new image works for you? Any problems please ping me.
@fracasula Is the new image works for you? Any problems please ping me.
@codelipenghui yeah the last image is definitely working, thanks! I haven't got back to you because I'm off till next week and had troubles replicating the "silent broker" issue, it's quite random and hard to replicate. I think most of the times we had stuck consumers was due to client issues (Golang deadlock).
I think most of the times we had stuck consumers was due to client issues (Golang deadlock).
Hello @fracasula , can you provide more content for Golang deadlock?
I think most of the times we had stuck consumers was due to client issues (Golang deadlock).
Hello @fracasula , can you provide more content for
Golang deadlock?
Hi @fracasula
is this issue affecting the Go client only or other clients as well ?
Also, make sure that you are not affected by this issue https://github.com/apache/pulsar/pull/8024 which was introduced in 2.6.0
is this issue affecting the Go client only or other clients as well ?
Hi @alexku7,
if you run the test I posted and set up a few breakpoints you'll realize that this is a Golang library issue. I tried with Python multiple times and it never got deadlocked while acking.
Also, make sure that you are not affected by this issue #8024 which was introduced in 2.6.0
I'm running on the image provided by @codelipenghui. How do I make sure that I'm not affected by that issue you're mentioning? Also, how is that related to a client deadlock? Are you referring to the "silent broker" scenario I described above instead?
Hi @fracasula
Honestly, i didn't run the tests. Just would like to ensure that the our java client is not affected :)
Regarding to the https://github.com/apache/pulsar/pull/8024, i mentioned it because you are describing very similar symptoms that we have suffered from a lot. The same symptom of stuck consumer with Key_shared subscription that most likely is not resolved by a consumer restart. So sometimes you are trying to resolve some issue which actually is masked by other similar issue and this is very misleading .
Anyway, this https://github.com/apache/pulsar/pull/8024 was resolved only last Friday by 2.6.1 sn-3. So i think that most likely you are working with earlier images
Honestly, i didn't run the tests. Just would like to ensure that the our java client is not affected :)
Cool, please provide feedback here if possible so that I can start working on a fix :+1:
Anyway, this #8024 was resolved only last Friday by 2.6.1 sn-3. So i think that most likely you are working with earlier images
I'm running on this one: https://hub.docker.com/layers/streamnative/pulsar-all/2.6.1-sn-hotfix-2/images/sha256-2429d29d86e8eac252d055ed10541353f3f89d9569512048bebea0b35eb71d01?context=explore
Can you please provide the latest with the hotfix (additional logs for the NPE) to see if I can replicate the "silent broker" issue with it?
@fracasula 2.6.1-sn-3 is based on the 2.6.1-sn-hotfix-2, and 2.6.1-sn-3 has fixed another issue #8024, so if you met the issue that #8024 fixed, you can upgrade to 2.6.1-sn-3
Hi guys, about the Golang deadlock, I'm able to replicate it with the latest image too (i.e. streamnative/pulsar-all:2.6.1-sn-3). It still shows that the client is stucks acking but it may not just be a client issue like suggested by @alexku7. By further developing my test I was able to overlap the logs of client and server and I can say that I consistenly get this on the server side each time the client gets stuck:
time="2020-09-17T09:15:46+02:00" level=info msg="Error reading from connection" error="Short read when reading frame size: read tcp [::1]:48104->[::1]:32771: use of closed network connection" local_addr="[::1]:48104" remote_addr="pulsar://localhost:32771"
07:15:46.482 [pulsar-io-51-16] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:38668
07:15:46.482 [pulsar-io-51-16] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-756c1ab9019bbcdc}, consumerId=1, consumerName=consumer-name-249de6d4895d0e15, address=/172.17.0.1:38668} with pending 200 acks
07:15:46.486 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-756c1ab9019bbcdc] Rewind from 13:700 to 13:400
consumer_partition_integration_test.go:112: [2020-09-17 09:15:46.62477658 +0200 CEST m=+181.898034767] Trying to ack 13:426:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446798277 +0200 CEST m=+171.004720824] Trying to ack 13:1318:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44681232 +0200 CEST m=+171.004734868] Trying to ack 13:1332:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44682636 +0200 CEST m=+171.004748901] Trying to ack 13:1334:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446840804 +0200 CEST m=+171.004763351] Trying to ack 13:1337:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446756504 +0200 CEST m=+171.004679062] Trying to ack 13:1324:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446845301 +0200 CEST m=+171.004767844] Trying to ack 13:1338:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446863899 +0200 CEST m=+171.004786443] Trying to ack 13:1340:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446876519 +0200 CEST m=+171.004799061] Trying to ack 13:1341:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446783149 +0200 CEST m=+171.004705702] Trying to ack 13:1328:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446772341 +0200 CEST m=+171.004694903] Trying to ack 13:1322:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446789742 +0200 CEST m=+171.004712290] Trying to ack 13:1321:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446902955 +0200 CEST m=+171.004825503] Trying to ack 13:1345:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.44692216 +0200 CEST m=+171.004844705] Trying to ack 13:1347:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446938844 +0200 CEST m=+171.004861396] Trying to ack 13:1348:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446957473 +0200 CEST m=+171.004880021] Trying to ack 13:1351:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446980957 +0200 CEST m=+171.004903502] Trying to ack 13:1349:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446985213 +0200 CEST m=+171.004907763] Trying to ack 13:1353:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446798504 +0200 CEST m=+171.004721046] Trying to ack 13:1323:0
consumer_partition_integration_test.go:112: [2020-09-16 18:16:12.446791901 +0200 CEST m=+171.004714458] Trying to ack 13:1326:0
...
Do you think that's something that could cause the consumer to hang forever waiting for those pending acks to be processed?
I ran it multiple times, here's the output the second time:
08:33:20.866 [pulsar-io-51-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40022] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-2da7ed37505244dc
08:33:20.960 [pulsar-io-51-7] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40022
08:33:20.960 [pulsar-io-51-7] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-2da7ed37505244dc}, consumerId=1, consumerName=consumer-name-748c24a16b99354a, address=/172.17.0.1:40022} with pending 148 ack
Third:
08:34:45.073 [pulsar-io-51-8] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:40030
08:34:45.074 [pulsar-io-51-8] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40030] Subscribing on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.074 [pulsar-io-51-8] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Cursor subscription-name-3afa9641ff625519 recovered to position 41:99999
08:34:45.074 [pulsar-io-51-8] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Creating ledger, metadata: {component=[109, 97, 110, 97, 103, 101, 100, 45, 108, 101, 100, 103, 101, 114], pulsar/managed-ledger=[112, 117, 98, 108, 105, 99, 47, 100, 101, 102, 97, 117, 108, 116, 47, 112, 101, 114, 115, 105, 115, 116, 101, 110, 116, 47, 116, 111, 112, 105, 99, 45, 110, 97, 109, 101, 45, 54, 49, 98, 57, 54, 57, 55, 98, 54, 102, 48, 56, 55, 97, 99, 97], pulsar/cursor=[115, 117, 98, 115, 99, 114, 105, 112, 116, 105, 111, 110, 45, 110, 97, 109, 101, 45, 51, 97, 102, 97, 57, 54, 52, 49, 102, 102, 54, 50, 53, 53, 49, 57], application=[112, 117, 108, 115, 97, 114]} - metadata ops timeout : 60 seconds
08:34:45.079 [main-EventThread] INFO org.apache.bookkeeper.client.LedgerCreateOp - Ensemble: [127.0.0.1:3181] for ledger: 45
08:34:45.081 [BookKeeperClientWorker-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/topic-name-61b9697b6f087aca] [subscription-name-3afa9641ff625519] Updating cursor info ledgerId=45 mark-delete=41:99999
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Updated cursor subscription-name-3afa9641ff625519 with ledger id 45 md-position=41:99999 rd-position=41:100000
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/topic-name-61b9697b6f087aca] Opened new cursor: ManagedCursorImpl{ledger=public/default/persistent/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519, ackPos=41:99999, readPos=41:100000}
08:34:45.084 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:0 to 41:0
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca] There are no replicated subscriptions on the topic
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca][subscription-name-3afa9641ff625519] Created new subscription for 1
08:34:45.085 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40030] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.086 [pulsar-io-51-8] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40030
08:34:45.086 [pulsar-io-51-8] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519}, consumerId=1, consumerName=consumer-name-3f0be6cd39d4812b, address=/172.17.0.1:40030} with pending 0 acks
08:34:45.087 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:1 to 41:0
08:34:45.187 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:40034
08:34:45.190 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40034] Subscribing on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.190 [pulsar-io-51-9] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:0 to 41:0
08:34:45.190 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca] There are no replicated subscriptions on the topic
08:34:45.191 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic-name-61b9697b6f087aca][subscription-name-3afa9641ff625519] Created new subscription for 1
08:34:45.191 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:40034] Created subscription on topic persistent://public/default/topic-name-61b9697b6f087aca / subscription-name-3afa9641ff625519
08:34:45.286 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:40034
08:34:45.286 [pulsar-io-51-9] INFO org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/topic-name-61b9697b6f087aca, name=subscription-name-3afa9641ff625519}, consumerId=1, consumerName=consumer-name-3f0be6cd39d4812b, address=/172.17.0.1:40034} with pending 64 acks
08:34:45.286 [bookkeeper-ml-workers-OrderedExecutor-0-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic-name-61b9697b6f087aca-subscription-name-3afa9641ff625519] Rewind from 41:3000 to 41:2836
08:34:50.409 [pulsar-web-69-13] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [17/Sep/2020:08:34:50 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats?getPreciseBacklog=false HTTP/1.1" 200 1283 "-" "Pulsar-Java-v2.6.1-sn-3" 7
The https://github.com/apache/pulsar-client-go/pull/376 will fix the issue of consumer blocking in go client.
Closed the issue since the two issues have been fixed. If there are any new issues, let's use a new issue to track it.
Most helpful comment
After further investigation I was able to come across two scenarios, which may be related.
Silent broker scenario
statstool reports several messages in themsgBacklogfor the given topic/subscription and it shows all the consumers as connected but with amsgRateOutof 0flowstage and then hang forever waiting for a MESSAGE command that never comes, meaning they connect to the broker, send the 1000 permits and then just get pings from here, no messages whatsoeverThis is the Python code I used:
And the
requirements.txt:Possible acking deadlock (Golang client)
trace)statstool reports no consumers at all despite all clients are print the PING/PONG successfully in their logsstatstool reports amsgBackloggreater than zero so there are messages waiting to be processedTrying to ackon the logs whenever it was taking more than 150msThe last bullet point means that I change this code:
Like this:
If the ack happens within 150ms we don't see any logs. The problem is that now I have all consumers stuck in a never ending loop just printing:
This has been going on for several hours and the message ID of the log entry is always the same until I kill the consumer.
Could it be that, given that the
eventsChis a buffered channel of 3, when a connection gets closed due to a message frame size that is too big then therunEventsLoop()never gets to process the*connectionClosedevent due to at least 3 in-flight ack requests?Meaning: we could have 3 ack requests that are already keeping the channel full, the connection gets closed, the acks can't be processed because the connection was closed and we cannot reconnect to the broker because, due to the channel being full, we can't push
*connectionClosedevent into theeventsChthus it never gets processed = deadlock?In support of this theory I can see this in the logs right before I get the never ending
Trying to ackloop:Also by analyzing the stack dump I can see that one of the 21 goroutines running is stuck waiting here.
Killing the consumer in this case helps but it eventually gets stuck again trying to ack some other message.
We could potentially try to look into the second scenario but have no clue whatsoever about the first one and we have no Java expertise. Can someone please look a bit more into this and tell us whether you need more information? Thanks.