Confluent-kafka-dotnet: Kafka consumer - lag is always 1 per partition, receiving duplicate last message per partition

Created on 10 Aug 2020  路  6Comments  路  Source: confluentinc/confluent-kafka-dotnet

Description

I have subscribed to a topic having 4 partitions. The consumer is receiving duplicate last message per partition. The consumer had read everything in a topic however it showing lag as 1 per partition when I restart the consumer last message is again getting received. There are 3 messages per partition in the topic, nothing is being produced in the topic still the same behavior.
Please suggest.

How to reproduce

Commit the offsets in another thread at every 10 secs, if there is something to commit.
_consumer.Subscribe(topics);
var commitTask = Task.Run(() => { CommitThread(_consumer, pc); }, _options.CancellationToken);
while (!IsCancelled(0))
{
try
{
var consumeResult = _consumer.Consume(_options.CancellationToken);
if (consumeResult.IsPartitionEOF) continue;

                    AddMessageForProcessing(consumeResult);
                }
                catch (ConsumeException consumeException)
                {
                    LogKafkaException(_consumer, consumeException);
                }
            }

//here processor holds all the successful read offset to be committed.

private void CommitOffsets(IConsumer consumer, KafkaParallelProcessor processor)
{
var offsets = processor.GetOffsetsToCommit();
if (!offsets.Any()) return;

        try
        {
            try
            {
                foreach (var topicPartitionOffset in offsets)
                {
                    consumer.StoreOffset(topicPartitionOffset);
                }

                processor.SaveCommittedOffsets(offsets);

              _logger.LogInformation($"Committed: {offsets.FormatOffsets()}");
            }
            catch (KafkaException e)
            {
                _logger.LogInformation($"Unable to commit offsets. Reason: {e.Error.Reason}\nOffsets: {offsets.FormatOffsets()}");
                throw;
            }
        }
        catch (Exception e)
        {
            _logger.LogError($"Unable to commit offsets: {offsets.FormatOffsets()}", e);
        }
    }

Checklist

Please provide the following information:

  • [ ] A complete (i.e. we can run it), minimal program demonstrating the problem. No need to supply a project file.
  • [ 1.4.3] Confluent.Kafka nuget version.
  • [ ] Apache Kafka version.
  • [ "group.id": "Test_13",
    "bootstrap.servers": "servers",
    "session.timeout.ms": "120000",
    "heartbeat.interval.ms": "15000",
    "api.version.request": "true",
    "fetch.message.max.bytes": "102400",
    "log.connection.close": "false",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": true,
    "enable.auto.offset.store": false ] Client configuration.
  • [Windows] Operating system.
  • [ ] Provide logs (with "debug" : "..." as necessary in configuration).
  • [ ] Provide broker log excerpts.
  • [ Yes] Critical issue.
question

Most helpful comment

i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the _next message to consume_, i.e. last consumed message + 1.

All 6 comments

You enabled auto-commit. It's expected to receive duplicates. You can read more here.

BTW, I don't think you need to store the offset manually as you are not doing anything differently from auto offset storing.

I did try disabling auto-commit and committing it manually from different thread using method consumer.CommitOffsets(offsets); still getting duplicate message when consumer restarts. I don't want duplicate messages to be processed. how to solve this scenario

i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the _next message to consume_, i.e. last consumed message + 1.

I've experiencing similar issue. All fine comes when offset commited each message and handled separately.
But when for example handling batch of 100 messages and only commiting highest offset per partition issue appears.

Flow:

  1. Consumer commits all highest offsets per partition
  2. No messages left to consume
  3. Restart application
  4. It re-consumes last messages per all partitions (if 3 partitions - 3 message will be reconsumed)
  5. If you repeat 3. step always 4. step occurs unless new messages appeared in consumer, then it's starts again from 1. step.

Batch handling takes mainly under 10 secs
Consumer settings:

EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true,
HeartbeatIntervalMs = 10 secs
SessionTimeoutMs = 5 mins
MaxPollIntervalMs = 2 hours
BootstrapServers = "servers"

For commiting multiple messages I use this method:
void Commit(IEnumerable<TopicPartitionOffset> offsets);
I get no exception and do commit in while loop with try catch, if I get exception I try recommit same message

Tryied separately comming all messages in foreach with this method:
void Commit(ConsumeResult<TKey, TValue> result);
Same happened.

Only if one message handled per time and commited one by one app restart, doesn't repeats last message

How are you getting offsets in var offsets = processor.GetOffsetsToCommit();
You need to increment the offset manually if you use the StoreOffset(TopicPartitionOffset) method.
Have a look at https://github.com/confluentinc/confluent-kafka-dotnet/blob/25f320a672b4324d732304cb4efa2288867b320c/src/Confluent.Kafka/Consumer.cs#L338

This was exactly my problem. Setting the committed offset to last consumed message + 1 as @mhowlett explained fixed it. Thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

Duorman picture Duorman  路  3Comments

michael-huxtable picture michael-huxtable  路  4Comments

vinodres picture vinodres  路  4Comments

kvandake picture kvandake  路  3Comments

farodin91 picture farodin91  路  3Comments