Describe the bug
Log compaction fails due to a timeout.
To Reproduce
Detailed steps to reproduce:
# rm -r data logs # !!! careful !!! this wipes all your topics etc.
bin/pulsar standalone
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://192.168.11.22:6650")
.build();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("my-topic")
.create();
int msgCount = 500000;
long before = System.currentTimeMillis();
for (int i = 0; i < msgCount; i++) {
producer.newMessage()
.key("asdf")
.value(new byte[] {1, 2, 3, 4, 5})
.sendAsync();
}
System.out.println("Sent " + msgCount + " messages in " + (System.currentTimeMillis() - before) + " msec");
$ bin/pulsar-admin topics compact persistent://public/default/my-topic
Topic compaction requested for persistent://public/default/my-topic
$ bin/pulsar-admin topics compaction-status persistent://public/default/my-topic
Compaction is currently running
$ # WAIT FOR 10 SECONDS
$ bin/pulsar-admin topics compaction-status persistent://public/default/my-topic
Error in compaction
null
Reason: Error compacting: java.util.concurrent.TimeoutException: Timeout
Expected behavior
Compaction and no error
Desktop (please complete the following information):
Linux 4.19.0-4-amd64 #1 SMP Debian 4.19.28-2 (2019-03-15) x86_64 unknown unknown GNU/LinuxLogs
The complete logs from the start of compaction until the error was read via compaction status
09:47:54.409 [pulsar-web-57-8] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
"topicNames" : [ "persistent://public/default/my-topic" ],
"topicsPattern" : null,
"subscriptionName" : "__compaction",
"subscriptionType" : "Exclusive",
"receiverQueueSize" : 1000,
"acknowledgementsGroupTimeMicros" : 100000,
"negativeAckRedeliveryDelayMicros" : 60000000,
"maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
"consumerName" : null,
"ackTimeoutMillis" : 0,
"tickDurationMillis" : 1000,
"priorityLevel" : 0,
"cryptoFailureAction" : "FAIL",
"properties" : { },
"readCompacted" : true,
"subscriptionInitialPosition" : "Latest",
"patternAutoDiscoveryPeriod" : 1,
"regexSubscriptionMode" : "PersistentOnly",
"deadLetterPolicy" : null,
"autoUpdatePartitions" : true,
"replicateSubscriptionState" : false,
"resetIncludeHead" : false,
"keySharedPolicy" : null
}
09:47:54.410 [pulsar-web-57-8] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650",
"authPluginClassName" : null,
"authParams" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : null,
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000
}
09:47:54.411 [pulsar-web-57-8] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:47:54 +0900] "PUT /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.5.0" 6
09:47:54.411 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribing to topic on cnx [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650]
09:47:54.412 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Subscribing on topic persistent://public/default/my-topic / __compaction
09:47:54.412 [pulsar-io-50-7] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-__compaction] Rewind from 1544:100 to 1544:0
09:47:54.412 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic] There are no replicated subscriptions on the topic
09:47:54.412 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic][__compaction] Created new subscription for 1
09:47:54.412 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Created subscription on topic persistent://public/default/my-topic / __compaction
09:47:54.413 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:47:54.413 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Seek subscription to message id -1:-1:-1
09:47:54.415 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:47:54.416 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:47:54.416 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 1
09:47:54.416 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/my-topic] [__compaction] Closed connection [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
09:47:54.416 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
09:47:54.416 [bookkeeper-ml-workers-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic] Initiate reset position to 1544:-1 on cursor __compaction
09:47:54.424 [BookKeeperClientWorker-OrderedExecutor-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic] reset position to 1544:-1 before current read position 1544:0 on cursor __compaction
09:47:54.425 [BookKeeperClientWorker-OrderedExecutor-1-0] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] [persistent://public/default/my-topic][__compaction] Reset subscription to message id -1:-1
09:47:54.426 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Successfully reset subscription to message id -1:-1:-1
09:47:54.516 [pulsar-timer-87-1] INFO org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/my-topic] [__compaction] Reconnecting after timeout
09:47:54.517 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribing to topic on cnx [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650]
09:47:54.517 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Subscribing on topic persistent://public/default/my-topic / __compaction
09:47:54.518 [pulsar-io-50-7] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-__compaction] Rewind from 1544:-1 to 1544:0
09:47:54.518 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic] There are no replicated subscriptions on the topic
09:47:54.518 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic][__compaction] Created new subscription for 1
09:47:54.518 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Created subscription on topic persistent://public/default/my-topic / __compaction
09:47:54.518 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:47:54.526 [pulsar-external-listener-86-1] WARN org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
09:47:54.527 [pulsar-external-listener-86-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Get topic last message Id
09:47:54.527 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Successfully getLastMessageId 1544:37344
09:47:54.527 [pulsar-client-io-85-1] INFO org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/my-topic, reading to 1544:37344:-1
09:47:57.416 [pulsar-web-57-12] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:47:57 +0900] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 887 "-" "Pulsar-Java-v2.5.0" 5
09:48:02.838 [pulsar-web-57-1] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:02 +0900] "GET /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 200 35 "-" "Pulsar-Java-v2.5.0" 2
09:48:04.646 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Closing consumer: 1
09:48:04.646 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:48:04.646 [pulsar-io-50-7] INFO org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Closed consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:48:04.646 [pulsar-client-io-85-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic] [__compaction] Closed consumer
09:48:05.941 [pulsar-web-57-3] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:05 +0900] "GET /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 200 79 "-" "Pulsar-Java-v2.5.0" 2
09:48:27.417 [pulsar-web-57-6] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:27 +0900] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 887 "-" "Pulsar-Java-v2.5.0" 5
@EugenDueck
I've tried on my laptop, it's hard to reproduce. Here is my reproduce steps:
✘ lipenghui@lipenghuideMacBook-Pro ~/Downloads/apache-pulsar-2.5.0/bin master ●✚ ./pulsar-client produce -m '{}' test
10:37:17.928 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]] Connected to server
10:37:18.915 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
"topicName" : "test",
"producerName" : null,
"sendTimeoutMs" : 30000,
"blockIfQueueFull" : false,
"maxPendingMessages" : 1000,
"maxPendingMessagesAcrossPartitions" : 50000,
"messageRoutingMode" : "RoundRobinPartition",
"hashingScheme" : "JavaStringHash",
"cryptoFailureAction" : "FAIL",
"batchingMaxPublishDelayMicros" : 1000,
"batchingPartitionSwitchFrequencyByPublishDelay" : 10,
"batchingMaxMessages" : 1000,
"batchingMaxBytes" : 131072,
"batchingEnabled" : true,
"compressionType" : "NONE",
"initialSequenceId" : null,
"autoUpdatePartitions" : true,
"multiSchema" : true,
"properties" : { }
}
10:37:18.920 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"authParams" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000
}
10:37:18.989 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [null] Creating producer on cnx [id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]
10:37:19.053 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-2] Created producer on cnx [id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]
10:37:19.094 [pulsar-timer-4-1] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
10:37:19.161 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
10:37:19.190 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-2] Closed Producer
10:37:19.214 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xb6f73439, L:/127.0.0.1:53315 ! R:localhost/127.0.0.1:6650] Disconnected
10:37:19.255 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
lipenghui@lipenghuideMacBook-Pro ~/Downloads/apache-pulsar-2.5.0/bin master ●✚ ./pulsar-client produce -m '{}' test -n 10
10:37:25.969 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]] Connected to server
10:37:26.548 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
"topicName" : "test",
"producerName" : null,
"sendTimeoutMs" : 30000,
"blockIfQueueFull" : false,
"maxPendingMessages" : 1000,
"maxPendingMessagesAcrossPartitions" : 50000,
"messageRoutingMode" : "RoundRobinPartition",
"hashingScheme" : "JavaStringHash",
"cryptoFailureAction" : "FAIL",
"batchingMaxPublishDelayMicros" : 1000,
"batchingPartitionSwitchFrequencyByPublishDelay" : 10,
"batchingMaxMessages" : 1000,
"batchingMaxBytes" : 131072,
"batchingEnabled" : true,
"compressionType" : "NONE",
"initialSequenceId" : null,
"autoUpdatePartitions" : true,
"multiSchema" : true,
"properties" : { }
}
10:37:26.558 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
"serviceUrl" : "pulsar://localhost:6650/",
"authPluginClassName" : null,
"authParams" : null,
"operationTimeoutMs" : 30000,
"statsIntervalSeconds" : 60,
"numIoThreads" : 1,
"numListenerThreads" : 1,
"connectionsPerBroker" : 1,
"useTcpNoDelay" : true,
"useTls" : false,
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"concurrentLookupRequest" : 5000,
"maxLookupRequest" : 50000,
"maxNumberOfRejectedRequestPerConnection" : 50,
"keepAliveIntervalSeconds" : 30,
"connectionTimeoutMs" : 10000,
"requestTimeoutMs" : 60000,
"initialBackoffIntervalNanos" : 100000000,
"maxBackoffIntervalNanos" : 60000000000
}
10:37:26.593 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [null] Creating producer on cnx [id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]
10:37:26.641 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-3] Created producer on cnx [id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]
10:37:26.680 [main] WARN com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
10:37:26.772 [main] INFO org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
10:37:26.797 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-3] Closed Producer
10:37:26.803 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x0d76dfd4, L:/127.0.0.1:53316 ! R:localhost/127.0.0.1:6650] Disconnected
10:37:26.831 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 10 messages successfully produced
lipenghui@lipenghuideMacBook-Pro ~/Downloads/apache-pulsar-2.5.0/bin master ●✚ ./pulsar-admin topics compact test
Topic compaction requested for persistent://public/default/test
✘ lipenghui@lipenghuideMacBook-Pro ~/Downloads/apache-pulsar-2.5.0/bin master ●✚ ./pulsar-admin topics compaction-status test
Compaction was a success
lipenghui@lipenghuideMacBook-Pro ~/Downloads/apache-pulsar-2.5.0/bin master ●✚ ./pulsar-admin topics compaction-status test
Compaction was a success
Did I lose some information?
@codelipenghui kannarfr over on slack#general is seeing the same problem, so it's certainly not just me.
Did you create your producer exactly as I did, and did you produce messages in the same way? (see "detailed steps to reproduce")
I have the same problem with timeout. This problem occurs when I add empty message to topic and start it compaction.
After this problem appear I can't read this compacted topic till I reset broker.
@pienio7 What do you mean by "empty"? Is the value of the message empty? What is the data type of your message?
@EugenDueck Yes, the value of message is empty.
producer.send(''.encode('utf-8'), partition_key=key)
@pienio7 So I guess this is python, and the data type of your topic as a bytes (a byte array), and an empty one in this case.
@EugenDueck Yes
Hi, in my case the topic is empty, but anyway the error displayed if not meaning full. But I understand that it fails if topic is empty.
The error can be reproduced if the topic is empty, as described by @KannarFr , as it will try to read the next message which is not there, and a scheduled timeout will kick in after ten seconds and complete the compaction with a TimeoutException:
https://github.com/apache/pulsar/blob/13d8ecd20a3c6795405fbf5946c1907e9c90dd91/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L106
And even if a message is produced to the empty topic after the compaction started, it will still fail with a TimeoutException as the condition to end the async loop will not be true because lastMessageId will be MessageId.earliest and always less than the id of the message just received in this case:
https://github.com/apache/pulsar/blob/13d8ecd20a3c6795405fbf5946c1907e9c90dd91/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L139
However, I have not reproduced the error when compacting a non-empty topic yet, follow the steps as @EugenDueck described. Could it be reading a large message for too long because the reading is throttled? Does it reproduce the problem every time? And what if you trigger the compaction again after you see the timeout error? Could you provide a complete code snippet of exactly how you produced the messages? Thanks! @EugenDueck
Also, the problem describled by @pienio7 seems to be a different one due to the empty value of the message, and I'll dig into it further.
@fantapsody
I've now stripped this down to a self-contained example and have updated the instructions to reproduce the problem. I can reliably replicate it using above snippet.
I've checked that the contents of neither key nor value are a problem, by using a single, hard-coded value for each. So when sending thousands of identical messages with the same key and value at low throughput (500 msg/sec or less), the topic can be compacted. But when sending thousands of identical messages at high throughput (I could replicate at more than 900 msg/sec), the topic becomes uncompactable.
One more observation while playing with this: When trying to compact a freshly created topic which no messages has been sent to yet, I get the same timeout error. This is perhaps not that big a deal, but imo should be fixed.
Thanks for all the details @EugenDueck provided, I can reliably reproduce the problem as described now. I found a few problems for log compaction here:
For problem 3, I got the stats of the topic when the compaction stucks, and some metrics like availablePermits, lastAckedTimestamp seems to be anomalous here. Maybe someone could provide some insight or advice based on these metrics and the problems described? @ivankelly @sijie
{
"msgRateIn" : 311362.5386883373,
"msgThroughputIn" : 7316116.5272862585,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"averageMsgSize" : 23.497099420201696,
"storageSize" : 238074383,
"backlogSize" : 238074383,
"publishers" : [ {
"msgRateIn" : 311362.5386883373,
"msgThroughputIn" : 7316116.5272862585,
"averageMsgSize" : 23.0,
"producerId" : 0,
"metadata" : { },
"address" : "/127.0.0.1:57444",
"producerName" : "standalone-1-3",
"connectedSince" : "2020-02-05T22:29:10.529+08:00",
"clientVersion" : "2.5.0"
} ],
"subscriptions" : {
"__compaction" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 11181,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 40122,
"type" : "Exclusive",
"activeConsumerName" : "3e96d",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1580912972315,
"lastConsumedTimestamp" : 1580912972386,
"lastAckedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "3e96d",
"availablePermits" : -39122,
"unackedMessages" : 40122,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 1580912972386,
"metadata" : { },
"address" : "/127.0.0.1:56969",
"connectedSince" : "2020-02-05T22:29:32.314+08:00",
"clientVersion" : "2.6.0-SNAPSHOT"
} ],
"isReplicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"bytesInCounter" : 238088065,
"msgInCounter" : 9985337
}
Another point, the log compaction is run on event, like foreach 100MB of data, right? Can it be on the fly, like foreach message received on the topic, it's added to the compacted topic too?
@KannarFr : Currently it doesn't support compacting events on the fly. Its is an interesting feature request to consider although it is probably for a separate discussion for the features and improvements that we need to add to compaction.
@sijie You are right, I will write an issue.
(loop @codelipenghui in this thread since he has good experiences on dealing with permits related issues on production)
@fantapsody
regarding the 3) question you posted in https://github.com/apache/pulsar/issues/6173#issuecomment-582480075, it does look like a flow-control problem.
I think the main problem is here: https://github.com/apache/pulsar/blob/402ecec9c3731711fd1bf700c30d09678aa9b1e5/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java#L144
The "permits" in Pulsar's flow control logic is designed for "messages" not "batches". That says one permit per message. So broker decrementing the permits after it dispatched messages. It is okay to see negative "permits" at the broker side because it means that broker dispatched more messages that the consumer requested.
However, it is NOT okay if the consumer counts the permits by batches. For example, a consumer/reader requests 100 permits (the receiver queue size), the broker dispatches 1000 "messages" (10 message per batch). Hence the "permits" at broker side will become -900. Since RawReaderImpl treats one permit per batch, it will request 50 permits again from the broker once the available permits at reader side reach the threshold (which is half of the receiver queue size).
The broker receives 50 permits and increases its available permits back to -850. But it is still negative. So broker will not read any more messages and cause raw reader to be stuck at waiting for new messages.
The problem of 3) can be addressed by fixing https://github.com/apache/pulsar/blob/402ecec9c3731711fd1bf700c30d09678aa9b1e5/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java#L144 to use number of messages as permits.
Besides this problem, @codelipenghui has seen a lot of "consumer stuck" problems related to permits. We have been discussing how to introduce a self-healing mechanism into current flow-control logic to avoid similar mistakes in client implementation. I think we should discuss a more generic approach to improve flow-control after fixing this issue here.