Confluent-kafka-dotnet: [Question] How to consume latest message in topic? v1.0-beta3

Created on 25 Apr 2019  路  5Comments  路  Source: confluentinc/confluent-kafka-dotnet

Sorry, I'm new to Apache Kafka and find even such simple concept confusing.

In my application I need an option to trigger consumer on startup. It all goes well if there are unread messages in the topic. But I still need to trigger consumer with the latest message available even if that message was read before by this consumer group.

By googling I found a lot of suggestions on using .Seek .Assign .Position , but I don't get where I can find TopicPartitionOffset and TopicPartition objects to use those methods.

Using version v1.0-beta3

Also I was wondering if there is a separate connection for each created consumer object? Is there a way to create 1 Kafka connector or maybe it is done automatically in the background?

question

Most helpful comment

Sorry, I'm new to Apache Kafka and find even such simple concept confusing.

There's a bit of a learning curve, especially if you're not used to distributed systems, but absolutely worth the effort.

Assuming you're using subscribe (joining a consumer group), something like this should do the trick:

using (var consumer = new ConsumerBuilder<Null, string>(cConfig)
    .SetPartitionsAssignedHandler((c, ps) =>
    {
        return ps.Select(tp => new TopicPartitionOffset(tp, c.QueryWatermarkOffsets(tp, TimeSpan.FromSeconds(10)).High - 1));
    })
    .Build())

Note that if you don't need the scale of consumer groups, you can just assign directly.

Each consumer will maintain connections to all brokers it needs to communicate with. Typically, you should not have more than one consumer per process.

All 5 comments

Sorry, I'm new to Apache Kafka and find even such simple concept confusing.

There's a bit of a learning curve, especially if you're not used to distributed systems, but absolutely worth the effort.

Assuming you're using subscribe (joining a consumer group), something like this should do the trick:

using (var consumer = new ConsumerBuilder<Null, string>(cConfig)
    .SetPartitionsAssignedHandler((c, ps) =>
    {
        return ps.Select(tp => new TopicPartitionOffset(tp, c.QueryWatermarkOffsets(tp, TimeSpan.FromSeconds(10)).High - 1));
    })
    .Build())

Note that if you don't need the scale of consumer groups, you can just assign directly.

Each consumer will maintain connections to all brokers it needs to communicate with. Typically, you should not have more than one consumer per process.

Thank you for the answer, I've got one more question regarding last sentence.
Each consumer will maintain connections to all brokers it needs to communicate with. Typically, you should not have more than one consumer per process.

What if I have central object with dictionary for a big number of consumers each having different configurations and each of them has their own thread where they do .Consume()?

Maybe you can suggest some way to reduce the amount of simultaneous connections for such case?

@mhowlett can you please answer the last question asked, so I can close this issue?

what configuration is different between your consumers?

if you can, use one consumer instance to read all messages and push them out to worker threads as appropriate.

The difference between consumers is in Topic and EnableAutoCommit (maybe more properties will be different in the future) values and also in some auxiliary logic to trigger latest message on application startup(could be enabled/disabled per consumer).

And each of this consumers is waiting for messages in it's own thread in parallel.

if you can, use one consumer instance to read all messages and push them out to worker threads as appropriate.

If I understand it right, it is only possible if the only property that is different between consumers is Topic, right?

Was this page helpful?
0 / 5 - 0 ratings