Confluent-kafka-dotnet: [Question] Consuming and Commiting Messages

Created on 1 Jun 2017  路  3Comments  路  Source: confluentinc/confluent-kafka-dotnet

Scenario:
I have a producer that pumps data to a topic. I then have a consumer that reads that message but there will be special instances wherein the consumer will be stopped for various reasons.

Requirement:
Is it possible for a consumer to just read and commit the messages that were sent on the time that the consumer has started running? What would happen to the previous messages that were sent to the topic?

Edit: For more clarity.

question

Most helpful comment

Producer has no groupid - it's only for consumer management, both to dispatch messages to different consumers and/or to manage offset when consumer stop/restart

In your case, you seem to want to start consuming at end of partition (only consume messages produced after consumer started, there might be messages that are not processed if messages where produced during downtime). You can either:

  • Use Assign if you know the partitioning of the topic. Note that if partitions are added dynamically, the connsumer won't catch them automatically (only with subscribe).
  • Use subscribe with a random group.id (new GuiId() by example) and use Subscribe. This will create a new consumer group which will begin at end of topics by default.
  • Use subscribe with a defined group.id, and use OnPartitionAssigned

For both subscribe solution, you will have to disable auto commit (you don't seem to want to commit offsets as you don't care where you stopped)

Code sample for second solution:
```c#
var config = new Dictionary
{
["group.id"] = Guid.NewGuid().ToString(),
["bootstrap.servers"] = brokerList,
["enable.auto.commit"] = false,
["auto.offset.reset"] = "latest", //default
};

        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

            consumer.OnPartitionEOF += (_, end)
                => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

            consumer.OnError += (_, error)
                => Console.WriteLine($"Error: {error}");

            consumer.OnConsumeError += (_, msg)
                => Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

            consumer.Subscribe("topic");

            bool cancelled = false;
            while (!cancelled)
            {
                consumer.Poll(TimeSpan.FromMilliseconds(100));
            }
        }

```

All 3 comments

With groupid, manual commit and custom assign, you should. I don't quite understand the scenario.
How many consumers are there and are they in the same group id?
If a consumer stop, what should happen when you restart it, given the producer sent 0 or some messages while restarting?

There is only one consumer and producer and they will be in the same group id.

If a consumer stop and has restarted, it must only pick up the messages that were dropped to the topic the same time the consumer has restarted.

Producer has no groupid - it's only for consumer management, both to dispatch messages to different consumers and/or to manage offset when consumer stop/restart

In your case, you seem to want to start consuming at end of partition (only consume messages produced after consumer started, there might be messages that are not processed if messages where produced during downtime). You can either:

  • Use Assign if you know the partitioning of the topic. Note that if partitions are added dynamically, the connsumer won't catch them automatically (only with subscribe).
  • Use subscribe with a random group.id (new GuiId() by example) and use Subscribe. This will create a new consumer group which will begin at end of topics by default.
  • Use subscribe with a defined group.id, and use OnPartitionAssigned

For both subscribe solution, you will have to disable auto commit (you don't seem to want to commit offsets as you don't care where you stopped)

Code sample for second solution:
```c#
var config = new Dictionary
{
["group.id"] = Guid.NewGuid().ToString(),
["bootstrap.servers"] = brokerList,
["enable.auto.commit"] = false,
["auto.offset.reset"] = "latest", //default
};

        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
            consumer.OnMessage += (_, msg)
                => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");

            consumer.OnPartitionEOF += (_, end)
                => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");

            consumer.OnError += (_, error)
                => Console.WriteLine($"Error: {error}");

            consumer.OnConsumeError += (_, msg)
                => Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");

            consumer.Subscribe("topic");

            bool cancelled = false;
            while (!cancelled)
            {
                consumer.Poll(TimeSpan.FromMilliseconds(100));
            }
        }

```

Was this page helpful?
0 / 5 - 0 ratings

Related issues

SaMirzaei picture SaMirzaei  路  4Comments

michael-huxtable picture michael-huxtable  路  4Comments

MihaiComan87 picture MihaiComan87  路  3Comments

kvandake picture kvandake  路  3Comments

vinodres picture vinodres  路  4Comments