Pulsar: Log compaction fails due to timeout

Created on 31 Jan 2020  ·  15Comments  ·  Source: apache/pulsar

Describe the bug
Log compaction fails due to a timeout.

To Reproduce

  1. Start pulsar standalone
  2. Create producer
  3. Produce messages at high throughput
  4. Trigger topic compaction
  5. Read compaction status

Detailed steps to reproduce:

  1. Clear all state
# rm -r data logs # !!! careful !!! this wipes all your topics etc.
  1. Start pulsar standalone
bin/pulsar standalone
  1. Create producer
PulsarClient pulsarClient = PulsarClient.builder()
    .serviceUrl("pulsar://192.168.11.22:6650")
    .build();
Producer<byte[]> producer = pulsarClient.newProducer()
    .topic("my-topic")
    .create();
  1. Produce messages at high throughput
int msgCount = 500000;
long before = System.currentTimeMillis();
for (int i = 0; i < msgCount; i++) {
    producer.newMessage()
            .key("asdf")
            .value(new byte[] {1, 2, 3, 4, 5})
            .sendAsync();
}
System.out.println("Sent " + msgCount + " messages in " + (System.currentTimeMillis() - before) + " msec");
  1. Trigger topic compaction
$ bin/pulsar-admin topics compact persistent://public/default/my-topic
Topic compaction requested for persistent://public/default/my-topic
  1. Read compaction status
$ bin/pulsar-admin topics compaction-status persistent://public/default/my-topic
Compaction is currently running

$ # WAIT FOR 10 SECONDS

$ bin/pulsar-admin topics compaction-status persistent://public/default/my-topic
Error in compaction
null

Reason: Error compacting: java.util.concurrent.TimeoutException: Timeout

Expected behavior
Compaction and no error

Desktop (please complete the following information):

  • Pulsar version: 2.5.0 (tar.gz from website)
  • OS: Linux 4.19.0-4-amd64 #1 SMP Debian 4.19.28-2 (2019-03-15) x86_64 unknown unknown GNU/Linux

Logs
The complete logs from the start of compaction until the error was read via compaction status

09:47:54.409 [pulsar-web-57-8] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {
  "topicNames" : [ "persistent://public/default/my-topic" ],
  "topicsPattern" : null,
  "subscriptionName" : "__compaction",
  "subscriptionType" : "Exclusive",
  "receiverQueueSize" : 1000,
  "acknowledgementsGroupTimeMicros" : 100000,
  "negativeAckRedeliveryDelayMicros" : 60000000,
  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,
  "consumerName" : null,
  "ackTimeoutMillis" : 0,
  "tickDurationMillis" : 1000,
  "priorityLevel" : 0,
  "cryptoFailureAction" : "FAIL",
  "properties" : { },
  "readCompacted" : true,
  "subscriptionInitialPosition" : "Latest",
  "patternAutoDiscoveryPeriod" : 1,
  "regexSubscriptionMode" : "PersistentOnly",
  "deadLetterPolicy" : null,
  "autoUpdatePartitions" : true,
  "replicateSubscriptionState" : false,
  "resetIncludeHead" : false,
  "keySharedPolicy" : null
}
09:47:54.410 [pulsar-web-57-8] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650",
  "authPluginClassName" : null,
  "authParams" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : null,
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000
}
09:47:54.411 [pulsar-web-57-8] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:47:54 +0900] "PUT /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.5.0" 6
09:47:54.411 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribing to topic on cnx [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650]
09:47:54.412 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Subscribing on topic persistent://public/default/my-topic / __compaction
09:47:54.412 [pulsar-io-50-7] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-__compaction] Rewind from 1544:100 to 1544:0
09:47:54.412 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic] There are no replicated subscriptions on the topic
09:47:54.412 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic][__compaction] Created new subscription for 1
09:47:54.412 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Created subscription on topic persistent://public/default/my-topic / __compaction
09:47:54.413 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:47:54.413 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Seek subscription to message id -1:-1:-1
09:47:54.415 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.Consumer - Disconnecting consumer: Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:47:54.416 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:47:54.416 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [localhost/127.0.0.1:6650] Broker notification of Closed consumer: 1
09:47:54.416 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/my-topic] [__compaction] Closed connection [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650] -- Will try again in 0.1 s
09:47:54.416 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentSubscription - [persistent://public/default/my-topic][__compaction] Successfully disconnected consumers from subscription, proceeding with cursor reset
09:47:54.416 [bookkeeper-ml-workers-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic] Initiate reset position to 1544:-1 on cursor __compaction
09:47:54.424 [BookKeeperClientWorker-OrderedExecutor-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic] reset position to 1544:-1 before current read position 1544:0 on cursor __compaction
09:47:54.425 [BookKeeperClientWorker-OrderedExecutor-1-0] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] [persistent://public/default/my-topic][__compaction] Reset subscription to message id -1:-1
09:47:54.426 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Successfully reset subscription to message id -1:-1:-1
09:47:54.516 [pulsar-timer-87-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://public/default/my-topic] [__compaction] Reconnecting after timeout
09:47:54.517 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribing to topic on cnx [id: 0xeb973738, L:/127.0.0.1:60180 - R:localhost/127.0.0.1:6650]
09:47:54.517 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Subscribing on topic persistent://public/default/my-topic / __compaction
09:47:54.518 [pulsar-io-50-7] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/my-topic-__compaction] Rewind from 1544:-1 to 1544:0
09:47:54.518 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic] There are no replicated subscriptions on the topic
09:47:54.518 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/my-topic][__compaction] Created new subscription for 1
09:47:54.518 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Created subscription on topic persistent://public/default/my-topic / __compaction
09:47:54.518 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 1
09:47:54.526 [pulsar-external-listener-86-1] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic] [__compaction] Could not get connection while getLastMessageId -- Will try again in 100 ms
09:47:54.527 [pulsar-external-listener-86-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Get topic last message Id
09:47:54.527 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic][__compaction] Successfully getLastMessageId 1544:37344
09:47:54.527 [pulsar-client-io-85-1] INFO  org.apache.pulsar.compaction.TwoPhaseCompactor - Commencing phase one of compaction for persistent://public/default/my-topic, reading to 1544:37344:-1
09:47:57.416 [pulsar-web-57-12] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:47:57 +0900] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 887 "-" "Pulsar-Java-v2.5.0" 5
09:48:02.838 [pulsar-web-57-1] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:02 +0900] "GET /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 200 35 "-" "Pulsar-Java-v2.5.0" 2
09:48:04.646 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Closing consumer: 1
09:48:04.646 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:48:04.646 [pulsar-io-50-7] INFO  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:60180] Closed consumer Consumer{subscription=CompactorSubscription{topic=persistent://public/default/my-topic, name=__compaction}, consumerId=1, consumerName=b411f, address=/127.0.0.1:60180}
09:48:04.646 [pulsar-client-io-85-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/my-topic] [__compaction] Closed consumer
09:48:05.941 [pulsar-web-57-3] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:05 +0900] "GET /admin/v2/persistent/public/default/my-topic/compaction HTTP/1.1" 200 79 "-" "Pulsar-Java-v2.5.0" 2
09:48:27.417 [pulsar-web-57-6] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [31/1/2020:09:48:27 +0900] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 887 "-" "Pulsar-Java-v2.5.0" 5
triagweek-6 typbug

All 15 comments

@EugenDueck
I've tried on my laptop, it's hard to reproduce. Here is my reproduce steps:

 ✘ lipenghui@lipenghuideMacBook-Pro  ~/Downloads/apache-pulsar-2.5.0/bin   master ●✚  ./pulsar-client produce -m '{}' test
10:37:17.928 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]] Connected to server
10:37:18.915 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
  "topicName" : "test",
  "producerName" : null,
  "sendTimeoutMs" : 30000,
  "blockIfQueueFull" : false,
  "maxPendingMessages" : 1000,
  "maxPendingMessagesAcrossPartitions" : 50000,
  "messageRoutingMode" : "RoundRobinPartition",
  "hashingScheme" : "JavaStringHash",
  "cryptoFailureAction" : "FAIL",
  "batchingMaxPublishDelayMicros" : 1000,
  "batchingPartitionSwitchFrequencyByPublishDelay" : 10,
  "batchingMaxMessages" : 1000,
  "batchingMaxBytes" : 131072,
  "batchingEnabled" : true,
  "compressionType" : "NONE",
  "initialSequenceId" : null,
  "autoUpdatePartitions" : true,
  "multiSchema" : true,
  "properties" : { }
}
10:37:18.920 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650/",
  "authPluginClassName" : null,
  "authParams" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000
}
10:37:18.989 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [null] Creating producer on cnx [id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]
10:37:19.053 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-2] Created producer on cnx [id: 0xb6f73439, L:/127.0.0.1:53315 - R:localhost/127.0.0.1:6650]
10:37:19.094 [pulsar-timer-4-1] WARN  com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
10:37:19.161 [main] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
10:37:19.190 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-2] Closed Producer
10:37:19.214 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xb6f73439, L:/127.0.0.1:53315 ! R:localhost/127.0.0.1:6650] Disconnected
10:37:19.255 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
 lipenghui@lipenghuideMacBook-Pro  ~/Downloads/apache-pulsar-2.5.0/bin   master ●✚  ./pulsar-client produce -m '{}' test -n 10
10:37:25.969 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]] Connected to server
10:37:26.548 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {
  "topicName" : "test",
  "producerName" : null,
  "sendTimeoutMs" : 30000,
  "blockIfQueueFull" : false,
  "maxPendingMessages" : 1000,
  "maxPendingMessagesAcrossPartitions" : 50000,
  "messageRoutingMode" : "RoundRobinPartition",
  "hashingScheme" : "JavaStringHash",
  "cryptoFailureAction" : "FAIL",
  "batchingMaxPublishDelayMicros" : 1000,
  "batchingPartitionSwitchFrequencyByPublishDelay" : 10,
  "batchingMaxMessages" : 1000,
  "batchingMaxBytes" : 131072,
  "batchingEnabled" : true,
  "compressionType" : "NONE",
  "initialSequenceId" : null,
  "autoUpdatePartitions" : true,
  "multiSchema" : true,
  "properties" : { }
}
10:37:26.558 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650/",
  "authPluginClassName" : null,
  "authParams" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000
}
10:37:26.593 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [null] Creating producer on cnx [id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]
10:37:26.641 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-3] Created producer on cnx [id: 0x0d76dfd4, L:/127.0.0.1:53316 - R:localhost/127.0.0.1:6650]
10:37:26.680 [main] WARN  com.scurrilous.circe.checksum.Crc32cIntChecksum - Failed to load Circe JNI library. Falling back to Java based CRC32c provider
10:37:26.772 [main] INFO  org.apache.pulsar.client.impl.PulsarClientImpl - Client closing. URL: pulsar://localhost:6650/
10:37:26.797 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [test] [standalone-0-3] Closed Producer
10:37:26.803 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0x0d76dfd4, L:/127.0.0.1:53316 ! R:localhost/127.0.0.1:6650] Disconnected
10:37:26.831 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 10 messages successfully produced
 lipenghui@lipenghuideMacBook-Pro  ~/Downloads/apache-pulsar-2.5.0/bin   master ●✚  ./pulsar-admin topics compact test
Topic compaction requested for persistent://public/default/test
 ✘ lipenghui@lipenghuideMacBook-Pro  ~/Downloads/apache-pulsar-2.5.0/bin   master ●✚  ./pulsar-admin topics compaction-status test
Compaction was a success
 lipenghui@lipenghuideMacBook-Pro  ~/Downloads/apache-pulsar-2.5.0/bin   master ●✚  ./pulsar-admin topics compaction-status test
Compaction was a success

Did I lose some information?

@codelipenghui kannarfr over on slack#general is seeing the same problem, so it's certainly not just me.
Did you create your producer exactly as I did, and did you produce messages in the same way? (see "detailed steps to reproduce")

I have the same problem with timeout. This problem occurs when I add empty message to topic and start it compaction.
After this problem appear I can't read this compacted topic till I reset broker.

@pienio7 What do you mean by "empty"? Is the value of the message empty? What is the data type of your message?

@EugenDueck Yes, the value of message is empty.
producer.send(''.encode('utf-8'), partition_key=key)

@pienio7 So I guess this is python, and the data type of your topic as a bytes (a byte array), and an empty one in this case.

@EugenDueck Yes

Hi, in my case the topic is empty, but anyway the error displayed if not meaning full. But I understand that it fails if topic is empty.

The error can be reproduced if the topic is empty, as described by @KannarFr , as it will try to read the next message which is not there, and a scheduled timeout will kick in after ten seconds and complete the compaction with a TimeoutException:
https://github.com/apache/pulsar/blob/13d8ecd20a3c6795405fbf5946c1907e9c90dd91/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L106

And even if a message is produced to the empty topic after the compaction started, it will still fail with a TimeoutException as the condition to end the async loop will not be true because lastMessageId will be MessageId.earliest and always less than the id of the message just received in this case:
https://github.com/apache/pulsar/blob/13d8ecd20a3c6795405fbf5946c1907e9c90dd91/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L139

However, I have not reproduced the error when compacting a non-empty topic yet, follow the steps as @EugenDueck described. Could it be reading a large message for too long because the reading is throttled? Does it reproduce the problem every time? And what if you trigger the compaction again after you see the timeout error? Could you provide a complete code snippet of exactly how you produced the messages? Thanks! @EugenDueck

Also, the problem describled by @pienio7 seems to be a different one due to the empty value of the message, and I'll dig into it further.

@fantapsody

I've now stripped this down to a self-contained example and have updated the instructions to reproduce the problem. I can reliably replicate it using above snippet.

I've checked that the contents of neither key nor value are a problem, by using a single, hard-coded value for each. So when sending thousands of identical messages with the same key and value at low throughput (500 msg/sec or less), the topic can be compacted. But when sending thousands of identical messages at high throughput (I could replicate at more than 900 msg/sec), the topic becomes uncompactable.

One more observation while playing with this: When trying to compact a freshly created topic which no messages has been sent to yet, I get the same timeout error. This is perhaps not that big a deal, but imo should be fixed.

Thanks for all the details @EugenDueck provided, I can reliably reproduce the problem as described now. I found a few problems for log compaction here:

  1. Fail for empty topics. This can be fixed by an additional check before compaction.
  2. Unable to finish the compaction if the value of the last message is empty when the compaction is triggered. This can be fixed by adding the missing check in the phase 2 of the compaction. Please notice that a message with an empty value indicates a deletion @pienio7
  3. Fail for massive writes. This is a bit complicated, as the raw reader for compaction stops reading new messages after the first batch (100 entries) of raw messages(which may contain many more sub-messages), and increasing the timeout duration doesn't help. It seems there is something wrong with flow control here, and I have to do more test and analysis.
    https://github.com/apache/pulsar/blob/13d8ecd20a3c6795405fbf5946c1907e9c90dd91/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java#L106

For problem 3, I got the stats of the topic when the compaction stucks, and some metrics like availablePermits, lastAckedTimestamp seems to be anomalous here. Maybe someone could provide some insight or advice based on these metrics and the problems described? @ivankelly @sijie

{
  "msgRateIn" : 311362.5386883373,
  "msgThroughputIn" : 7316116.5272862585,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 23.497099420201696,
  "storageSize" : 238074383,
  "backlogSize" : 238074383,
  "publishers" : [ {
    "msgRateIn" : 311362.5386883373,
    "msgThroughputIn" : 7316116.5272862585,
    "averageMsgSize" : 23.0,
    "producerId" : 0,
    "metadata" : { },
    "address" : "/127.0.0.1:57444",
    "producerName" : "standalone-1-3",
    "connectedSince" : "2020-02-05T22:29:10.529+08:00",
    "clientVersion" : "2.5.0"
  } ],
  "subscriptions" : {
    "__compaction" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 11181,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 40122,
      "type" : "Exclusive",
      "activeConsumerName" : "3e96d",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1580912972315,
      "lastConsumedTimestamp" : 1580912972386,
      "lastAckedTimestamp" : 0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "3e96d",
        "availablePermits" : -39122,
        "unackedMessages" : 40122,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 1580912972386,
        "metadata" : { },
        "address" : "/127.0.0.1:56969",
        "connectedSince" : "2020-02-05T22:29:32.314+08:00",
        "clientVersion" : "2.6.0-SNAPSHOT"
      } ],
      "isReplicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "bytesInCounter" : 238088065,
  "msgInCounter" : 9985337
}

Another point, the log compaction is run on event, like foreach 100MB of data, right? Can it be on the fly, like foreach message received on the topic, it's added to the compacted topic too?

@KannarFr : Currently it doesn't support compacting events on the fly. Its is an interesting feature request to consider although it is probably for a separate discussion for the features and improvements that we need to add to compaction.

@sijie You are right, I will write an issue.

(loop @codelipenghui in this thread since he has good experiences on dealing with permits related issues on production)

@fantapsody

regarding the 3) question you posted in https://github.com/apache/pulsar/issues/6173#issuecomment-582480075, it does look like a flow-control problem.

I think the main problem is here: https://github.com/apache/pulsar/blob/402ecec9c3731711fd1bf700c30d09678aa9b1e5/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java#L144

The "permits" in Pulsar's flow control logic is designed for "messages" not "batches". That says one permit per message. So broker decrementing the permits after it dispatched messages. It is okay to see negative "permits" at the broker side because it means that broker dispatched more messages that the consumer requested.

However, it is NOT okay if the consumer counts the permits by batches. For example, a consumer/reader requests 100 permits (the receiver queue size), the broker dispatches 1000 "messages" (10 message per batch). Hence the "permits" at broker side will become -900. Since RawReaderImpl treats one permit per batch, it will request 50 permits again from the broker once the available permits at reader side reach the threshold (which is half of the receiver queue size).

The broker receives 50 permits and increases its available permits back to -850. But it is still negative. So broker will not read any more messages and cause raw reader to be stuck at waiting for new messages.

The problem of 3) can be addressed by fixing https://github.com/apache/pulsar/blob/402ecec9c3731711fd1bf700c30d09678aa9b1e5/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java#L144 to use number of messages as permits.


Besides this problem, @codelipenghui has seen a lot of "consumer stuck" problems related to permits. We have been discussing how to introduce a self-healing mechanism into current flow-control logic to avoid similar mistakes in client implementation. I think we should discuss a more generic approach to improve flow-control after fixing this issue here.

Was this page helpful?
0 / 5 - 0 ratings