I have subscribed to a topic having 4 partitions. The consumer is receiving duplicate last message per partition. The consumer had read everything in a topic however it showing lag as 1 per partition when I restart the consumer last message is again getting received. There are 3 messages per partition in the topic, nothing is being produced in the topic still the same behavior.
Please suggest.
Commit the offsets in another thread at every 10 secs, if there is something to commit.
_consumer.Subscribe(topics);
var commitTask = Task.Run(() => { CommitThread(_consumer, pc); }, _options.CancellationToken);
while (!IsCancelled(0))
{
try
{
var consumeResult = _consumer.Consume(_options.CancellationToken);
if (consumeResult.IsPartitionEOF) continue;
AddMessageForProcessing(consumeResult);
}
catch (ConsumeException consumeException)
{
LogKafkaException(_consumer, consumeException);
}
}
//here processor holds all the successful read offset to be committed.
private void CommitOffsets(IConsumer
{
var offsets = processor.GetOffsetsToCommit();
if (!offsets.Any()) return;
try
{
try
{
foreach (var topicPartitionOffset in offsets)
{
consumer.StoreOffset(topicPartitionOffset);
}
processor.SaveCommittedOffsets(offsets);
_logger.LogInformation($"Committed: {offsets.FormatOffsets()}");
}
catch (KafkaException e)
{
_logger.LogInformation($"Unable to commit offsets. Reason: {e.Error.Reason}\nOffsets: {offsets.FormatOffsets()}");
throw;
}
}
catch (Exception e)
{
_logger.LogError($"Unable to commit offsets: {offsets.FormatOffsets()}", e);
}
}
Please provide the following information:
You enabled auto-commit. It's expected to receive duplicates. You can read more here.
BTW, I don't think you need to store the offset manually as you are not doing anything differently from auto offset storing.
I did try disabling auto-commit and committing it manually from different thread using method consumer.CommitOffsets(offsets); still getting duplicate message when consumer restarts. I don't want duplicate messages to be processed. how to solve this scenario
i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the _next message to consume_, i.e. last consumed message + 1.
I've experiencing similar issue. All fine comes when offset commited each message and handled separately.
But when for example handling batch of 100 messages and only commiting highest offset per partition issue appears.
Flow:
Batch handling takes mainly under 10 secs
Consumer settings:
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true,
HeartbeatIntervalMs = 10 secs
SessionTimeoutMs = 5 mins
MaxPollIntervalMs = 2 hours
BootstrapServers = "servers"
For commiting multiple messages I use this method:
void Commit(IEnumerable<TopicPartitionOffset> offsets);
I get no exception and do commit in while loop with try catch, if I get exception I try recommit same message
Tryied separately comming all messages in foreach with this method:
void Commit(ConsumeResult<TKey, TValue> result);
Same happened.
Only if one message handled per time and commited one by one app restart, doesn't repeats last message
How are you getting offsets in var offsets = processor.GetOffsetsToCommit();
You need to increment the offset manually if you use the StoreOffset(TopicPartitionOffset) method.
Have a look at https://github.com/confluentinc/confluent-kafka-dotnet/blob/25f320a672b4324d732304cb4efa2288867b320c/src/Confluent.Kafka/Consumer.cs#L338
This was exactly my problem. Setting the committed offset to last consumed message + 1 as @mhowlett explained fixed it. Thanks!
Most helpful comment
i didn't look at your code, but this behavior is often due to a misunderstanding of what committed offsets represent - committed offsets are interpreted as the _next message to consume_, i.e. last consumed message + 1.