I want to understand why Commit() is required (either auto/manual commit), after reading messages off a topic (either using Poll() or Consume() ).
Below is the modified code via .NET using Polling combined with manual CommitAsync() based on the code. https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/AdvancedConsumer/Program.cs
public static void Run_PollWithManualCommit(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
Console.WriteLine($"Committing offset");
var committedOffsets = consumer.CommitAsync(msg).Result;
Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
//this is NOT called, when autocommit is disabled
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
new Dictionary<string, object>
{
{ "group.id", "advanced-csharp-consumer" },
{ "enable.auto.commit", enableAutoCommit },
{ "auto.commit.interval.ms", 5000 },
{ "statistics.interval.ms", 60000 },
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};
I disabled the lines like below, and run Consumer again, it can still remember the CORRECT offset. That is,correct offset is used using Poll() method with manual commit without calling CommitAsync().
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
//Console.WriteLine($"Committing offset");
// var committedOffsets = consumer.CommitAsync(msg).Result;
// Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
For example, below is the test
Step 1 Run Consumer to consume one message,
Step 2 kill it
Step 3 re-start it
Step 4 it can still remember the correct offset each time after it re-start like below
D:\myStudio2\Kafka\confluent-kafka-dotnet\examples\AdvancedConsumer>dotnet run PollWithManualCommit localhost:9092 Advanced
Subscribed to: [Advanced]
Ctrl-C to exit.
Assigned partitions: [Advanced [0]], member id: rdkafka-e4310a50-4621-4c6b-9a26-73fc984b7072
Topic: Advanced Partition: 0 Offset: 0 hello
Reached end of topic Advanced partition 0, next message will be at offset 1
Below is the result of running Producer, which shows the offset 0
D:\myStudio2\Kafka\confluent-kafka-dotnet\examples\AdvancedProducer>dotnet run localhost:9092 Advanced
-----------------------------------------------------------------------
Producer rdkafka#producer-1 producing on topic Advanced.
-----------------------------------------------------------------------
To create a kafka message with UTF-8 encoded key/value message:
> key value<Enter>
To create a kafka message with empty key and UTF-8 encoded value:
> value<enter>
Ctrl-C to quit.
> hello
Partition: 0, Offset: 0
>
I try to avoid Consume() as it might be deprecated based on this API doc: https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html#Confluent_Kafka_Consumer_Consume_Confluent_Kafka_Message__System_Int32_
Please provide the following information:
I did the following:
My question is that what is the purpose of CommitAsync()? Even when the method is NOT called, Run_PollWithManualCommit() can remember last offset, as is demoed on my open post.
The problem is that I cannot find good document on Kafka .net from either Kafka, or Confluent.
When I run the example code you provide the consumer does not store the latest offset - each time the program runs it re-reads the messages. This behavior also verified in a number of integration tests. The purpose of CommitAsync is to commit the current consumer read position.
yes, we need better documentation, we're sorry about that, it's coming.
Hi @mhowlett,
Sorry, I did not mean to blame anyone. I just want to understand CommitAsync() in .NET version.
Even when CommitAsync() is called, when Consumer application is started, it will re-read all the messages from offset 0 up to offset of last message.
Below is the result. The last offset is 6, thus, it re-reads all messages until the line below when it started:
Reached end of topic Advanced partition 0, next message will be at offset 6
Then one new message is sent to topic via Producer, and read from topic, and below is displayed
Topic: Advanced Partition: 0 Offset: 6 World6
Committing offset
Committed offset: [Advanced [0] @7: Success]
Reached end of topic Advanced partition 0, next message will be at offset 7
Below is the details:
D:\myStudio2\Kafka\confluent-kafka-dotnet\examples\AdvancedConsumer>dotnet run PollWithManualCommit localhost:9092 Advanced
Subscribed to: [Advanced]
Ctrl-C to exit.
Assigned partitions: [Advanced [0]], member id: rdkafka-27a8d0f4-62dd-4e87-907c-904c9d916c34
Topic: Advanced Partition: 0 Offset: 0 World1
Committing offset
Committed offset: [Advanced [0] @1: Success]
Topic: Advanced Partition: 0 Offset: 1 World2
Committing offset
Committed offset: [Advanced [0] @2: Success]
Topic: Advanced Partition: 0 Offset: 2 World3
Committing offset
Committed offset: [Advanced [0] @3: Success]
Topic: Advanced Partition: 0 Offset: 3 World3
Committing offset
Committed offset: [Advanced [0] @4: Success]
Topic: Advanced Partition: 0 Offset: 4 World4
Committing offset
Committed offset: [Advanced [0] @5: Success]
Topic: Advanced Partition: 0 Offset: 5 World5
Committing offset
Committed offset: [Advanced [0] @6: Success]
Reached end of topic Advanced partition 0, next message will be at offset 6
Topic: Advanced Partition: 0 Offset: 6 World6
Committing offset
Committed offset: [Advanced [0] @7: Success]
Reached end of topic Advanced partition 0, next message will be at offset 7
Thus, both calling and not calling CommitAsyncgenerate the same result. Am I missing something here?
Sorry I'm having trouble following..
CommitAsync only has an effect if you restart the consumer. calling CommitAsync stores the current consumer position for the partitions it's reading from in Kafka. In the event of a consumer failure, the last stored offsets will be where the new consumer that picks up the partitions of the failed consumer will start reading from.
I meant the result is the same for when CommitAsync is called and when it is not called, because in both cases, the last offset is correct when Consumer restarts. Thus, I am confused here.
unable to reproduce, are you sure you're not accidentally auto committing?
i've manually checked a number of variations of committing, not committing.
My code result has been described on previous comments.
Below is the description again.
When CommitAsyncis NOT called from Run_PollWithManualCommit (ie, remove it or comment it out)
1 When Consumer starts, it reads all messages from offset 0 to last offset N that have been already read from last session
2 Post new messages to Producer, and Consumer display new message with offset N+1
3 Kill Consumer
4 Start Consumer, it displays all messages from offset 0 to N+1
The result of when CommitAsyncis called is the same as above.
That is why I am confused.
I'll re-verify on a windows machine tomorrow (this is inconvenient presently). When I run this code on mac os, offsets are being committed correctly. You might like to try version 0.11.3-ci-303 which references a more recent build of librdkafka.
Please also post the full program you are testing with. Paste the full code both with and with out commitasync being called (so i'm exactly sure I'm running the same thing you are). Thanks!
I already use the latest version of Confluent .NET Client.
Please find the code below: Consumer and Producer
Consumer below: Please run either of them to verify.
PollNoCommit
PollWithManualCommit
namespace Confluent.Kafka.Examples.AdvancedConsumer
{
public class Program
{
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
new Dictionary<string, object>
{
{ "group.id", Guid.NewGuid() },
{ "enable.auto.commit", enableAutoCommit },
{ "auto.commit.interval.ms", 5000 },
{ "statistics.interval.ms", 60000 },
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};
/// <summary>
// In this example:
/// - offsets are auto commited.
/// - consumer.Poll / OnMessage is used to consume messages.
/// - no extra thread is created for the Poll loop.
/// </summary>
public static void Run_Poll(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, true), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
public static void Run_PollWithManualCommit(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
Console.WriteLine($" Committing offset");
var committedOffsets = consumer.CommitAsync(msg).Result;
Console.WriteLine($" Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($" Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
//this is NOT called, when autocommit is disabled
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
public static void PollNoCommit(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
//Console.WriteLine($"Committing offset");
//var committedOffsets = consumer.CommitAsync(msg).Result;
//Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($" Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
//this is NOT called, when autocommit is disabled
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
/// <summary>
/// In this example
/// - offsets are manually committed.
/// - consumer.Consume is used to consume messages.
/// (all other events are still handled by event handlers)
/// - no extra thread is created for the Poll (Consume) loop.
/// </summary>
public static void Run_Consume(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($" Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
consumer.OnConsumeError += (_, error)
=> Console.WriteLine($"Consume error: {error}");
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//for Poll only
//consumer.OnMessage += (_, msg) =>
//{
// Console.WriteLine($"Message value: {msg.Value}");
//};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
consumer.Subscribe(topics);
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Message<Ignore, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
if (msg.Offset % 2 == 0)
{
Console.WriteLine($" Committing offset");
var committedOffsets = consumer.CommitAsync(msg).Result;
Console.WriteLine($" Committed offset: {committedOffsets}");
}
}
}
}
private static void PrintUsage()
=> Console.WriteLine("Usage: .. <poll|consume> <broker,broker,..> <topic> [topic..]");
public static void Main(string[] args)
{
if (args.Length < 3)
{
PrintUsage();
return;
}
var mode = args[0];
var brokerList = args[1];
var topics = args.Skip(2).ToList();
switch (mode)
{
case "poll":
Run_Poll(brokerList, topics);
break;
case "PollWithManualCommit":
Run_PollWithManualCommit(brokerList, topics);
break;
case "PollNoCommit":
Run_PollWithManualCommit(brokerList, topics);
break;
case "consume":
Run_Consume(brokerList, topics);
break;
default:
PrintUsage();
break;
}
}
}
}
Producer
namespace Confluent.Kafka.Examples.AdvancedProducer
{
public class Program
{
public static void Main(string[] args)
{
if (args.Length != 2)
{
Console.WriteLine("Usage: .. brokerList topicName");
return;
}
string brokerList = args[0];
string topicName = args[1];
/*
// TODO(mhowlett): allow partitioner to be set.
var topicConfig = new TopicConfig
{
CustomPartitioner = (top, key, cnt) =>
{
var kt = (key != null) ? Encoding.UTF8.GetString(key, 0, key.Length) : "(null)";
int partition = (key?.Length ?? 0) % cnt;
bool available = top.PartitionAvailable(partition);
Console.WriteLine($"Partitioner topic: {top.Name} key: {kt} partition count: {cnt} -> {partition} {available}");
return partition;
}
};
*/
var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } };
using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8)))
{
Console.WriteLine("\n-----------------------------------------------------------------------");
Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
Console.WriteLine("-----------------------------------------------------------------------");
Console.WriteLine("To create a kafka message with UTF-8 encoded key/value message:");
Console.WriteLine("> key value<Enter>");
Console.WriteLine("To create a kafka message with empty key and UTF-8 encoded value:");
Console.WriteLine("> value<enter>");
Console.WriteLine("Ctrl-C to quit.\n");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
while (!cancelled)
{
Console.Write("> ");
string text;
try
{
text = Console.ReadLine();
}
catch (IOException)
{
// IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
break;
}
if (text == null)
{
// Console returned null before
// the CancelKeyPress was treated
break;
}
var key = "";
var val = text;
// split line if both key and value specified.
int index = text.IndexOf(" ");
if (index != -1)
{
key = text.Substring(0, index);
val = text.Substring(index + 1);
}
var deliveryReport = producer.ProduceAsync(topicName, key, val);
var result = deliveryReport.Result; // synchronously waits for message to be produced.
Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}");
}
}
}
}
}
you are calling Run_PollWithManualCommit in both cases:
case "PollWithManualCommit":
Run_PollWithManualCommit(brokerList, topics);
break;
case "PollNoCommit":
Run_PollWithManualCommit(brokerList, topics);
break;
Please change it to below. The issue still applies.
case "PollWithManualCommit":
Run_PollWithManualCommit(brokerList, topics);
break;
case "PollNoCommit":
PollNoCommit(brokerList, topics);
The second time you paste the code, you are setting the group.id to a new guid every time which means each time you run the program any earlier committed offsets are for a different consumer group and so not applicable to the current execution.
By contrast, the first time you pasted the code you are setting group.id to advanced-csharp-consumer, so that code should produce the behavior your are expecting.
NO. The issue still exists.
I meant What is the purpose of CommitAsync?? It is not doing what it is meant to do here.
Again, If you disable CommitAsyncon Run_Consume() from here, the result is the same as CommitAsync is enabled. That is, it still remembers the last offset of the message that has been read off topic.
public static void Run_Consume(string brokerList, List<string> topics)
{
...
//if (msg.Offset % 5 == 0)
//{
// Console.WriteLine($"Committing offset");
// var committedOffsets = consumer.CommitAsync(msg).Result;
// Console.WriteLine($"Committed offset: {committedOffsets}");
//}
}
}
}
Thus, start testing the test case outlined on my previous comment like below again:
1 When Consumer starts, it reads all messages from offset 0 to last offset N that have been already read from last session
2 Post new messages to Producer, and Consumer displays new message with offset N+1
3 Kill Consumer
4 Start Consumer, it displays all messages from offset 0 to N+1
Based on what you said, It should not display N+1, because CommitAsync() is NOT called.
what you describe is expected behavior. CommitAsync will have no effect in your scenario because you are using a different consumer group on each run of your program (well I think so, you provide two variants of your program one where you don't and one where you do, so I'm just guessing which one you're actually running). Anyway, this is core functionality that is well tested, so it's unlikely there is any issue.
I am running the one on your github below, with the change of disabling CommitAsync, whose changes are shown clearly.
The program above always use the same consumer group, with "group.id" : "advanced-csharp-consumer". How do you explain on this?
You said: " this is core functionality that is well tested, so it's unlikely there is any issue."
But CommitAsyncis NOT doing what is supposed to do.
@PingPongSet Regardless of the issue or non-issue, I am fairly certain pull requests are welcome.
Reproduce with "debug", "cgrp,topic,fetch" and provide the logs
I'm with the same problem, every time I restart my application, commited messages are processed again.
Here is my code.
public static class KafkaConsumer
{
private static bool _started;
public static event Action<string, string> OnNewMessage;
public static void Start(IEnumerable<string> topics)
{
if (_started)
{
return;
}
_started = true;
var kafkaServer = ConfigurationManager.AppSettings["kafka:server"];
var config = new Dictionary<string, Object>
{
{ "bootstrap.servers", kafkaServer },
{ "group.id", "consumer" },
{ "auto.offset.reset", "latest" },
{ "enable.auto.commit", "false" },
};
Task.Run(() =>
{
using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
var partitionsOffset = topics.Select(topic => new TopicPartitionOffset(topic, 0, 0));
consumer.Assign(partitionsOffset);
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
consumer.OnConsumeError += (_, error)
=> Console.WriteLine($"Consume error: {error}");
while (true)
{
Message<Ignore, string> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(1)))
{
OnNewMessage(msg.Topic, msg.Value);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
var commitedOffset = consumer.CommitAsync(msg).Result;
}
}
}
});
}
}
Am I missing some config or anything else?
@brunneus - your consumer.Assign call sets the consumption offset to 0. Try passing a List<TopicPartition> to Assign, not List<TopicPartitionOffset>, or set Offset to Offset.Invalid.
@mhowlett Thank you very much.