Scenario:
I have a producer that pumps data to a topic. I then have a consumer that reads that message but there will be special instances wherein the consumer will be stopped for various reasons.
Requirement:
Is it possible for a consumer to just read and commit the messages that were sent on the time that the consumer has started running? What would happen to the previous messages that were sent to the topic?
Edit: For more clarity.
With groupid, manual commit and custom assign, you should. I don't quite understand the scenario.
How many consumers are there and are they in the same group id?
If a consumer stop, what should happen when you restart it, given the producer sent 0 or some messages while restarting?
There is only one consumer and producer and they will be in the same group id.
If a consumer stop and has restarted, it must only pick up the messages that were dropped to the topic the same time the consumer has restarted.
Producer has no groupid - it's only for consumer management, both to dispatch messages to different consumers and/or to manage offset when consumer stop/restart
In your case, you seem to want to start consuming at end of partition (only consume messages produced after consumer started, there might be messages that are not processed if messages where produced during downtime). You can either:
For both subscribe solution, you will have to disable auto commit (you don't seem to want to commit offsets as you don't care where you stopped)
Code sample for second solution:
```c#
var config = new Dictionary
{
["group.id"] = Guid.NewGuid().ToString(),
["bootstrap.servers"] = brokerList,
["enable.auto.commit"] = false,
["auto.offset.reset"] = "latest", //default
};
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (_, msg)
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
consumer.Subscribe("topic");
bool cancelled = false;
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
```
Most helpful comment
Producer has no groupid - it's only for consumer management, both to dispatch messages to different consumers and/or to manage offset when consumer stop/restart
In your case, you seem to want to start consuming at end of partition (only consume messages produced after consumer started, there might be messages that are not processed if messages where produced during downtime). You can either:
For both subscribe solution, you will have to disable auto commit (you don't seem to want to commit offsets as you don't care where you stopped)
Code sample for second solution:
```c#
var config = new Dictionary
{
["group.id"] = Guid.NewGuid().ToString(),
["bootstrap.servers"] = brokerList,
["enable.auto.commit"] = false,
["auto.offset.reset"] = "latest", //default
};
```