Hi,
I currently use Kafka as a messaging system for IoT. Our current architecture support one device per partition and, ideally, consuming every buffered messages for one partition/device before switching to the next would give us the behavior we need and avoid polling every partition, then re-differentiating the messages in internal queues, which tend to create memory bottlenecks.
Is it possible to specifically consume from a partition's queue or to expose them in the API ?
Best Regards,
Jonathan Schmidt
you don't need to use the consumer group functionality - you can directly assign to the partition(s) you want (see the simple consumer example). note that when you call assign, the old assignment is revoked - it does not add to the old assignment.
Actually the consumer group functionality is very interesting as it allows an automatic balancing of the workload among the workers, with "zombie"consumer detection performed by the kafka broker. This is an important part of our choice.
Or do you suggest using a consumer for each queue to have a single-file polling behavior ? As the number of devices get higher, I can see that causing problems down the line...
@mhowlett Thanks for the previous answer. As stated it would impact our scalability as we'd have to replicate the consumer group functionality and/or create a consumer for each TopicPartition. Given the number of TopicPartition we use (one per device, so upward of 5000) this seems unwieldly. I was under the impression that librdkafka had, internally, one queue per partition and fused everything into another for later cunsumption. Am I missing an implementation hardship that would prevent me from consuming these specific queues ?
@jschmidtWK That is indeed what librdkafka does under the hood and the C/C++ APIs allow fine-grained controlled over these queues. But since this is quite a rare use-case it hasn't (yet?) made it into the higher level bindings.
I see, thanks for the confirmation. There might be a way around it, although less predicatble : how does the fusing takes place ? Is every queue consumed fully before going to the next, or are the message served in a round-robin or timestamp-based sort of way ?
https://github.com/edenhill/librdkafka/wiki/FAQ#how-are-partitions-fetched
My apologies for not reading the documentation on this topic, thank you for the reference.
I just wrote it ;)
Thank you then, this is a very helpful document and answers my question about a possible workaround.
Back to the initial topic, is an API akin to Consumer.Consume(TopicPartition tp) something that would be possible ?
Hi @edenhill, @mhowlett,
Any thoughts on allowing a specific Consume or otherwise exposing the various TopicPartition queues ?
The fine grained control over partition queues that @edenhill is talking about is not something we're going to consider exposing in the short term. But come to think of it, the API does allow you to do what you want. After subscribing to a topic, a consumer gets allocated a set of partitions - but you have complete control over which of these you actually assign to and read from. So hook into the OnPartitionsAssigned and OnPartitionsRevoked events to maintain a list of allocated partitions. Don't Assign to all of these (or any of them) in OnPartitionsAssigned however - manage which partition you're currently assigned to in the set in the poll loop. cycle through the partitions one at a time, consuming until you get an OnPartitionEOF event. when that happens switch to the next partition.
The assign() strategy is a no go due to "long" startup time when a partition is switched. My use case being a few thousand partitions at once (IoT case with one partition per device) the iteration takes too long.
For now and until the internal queues are exposed I'm just artificially limiting the consumption rate when the buffer system on my end reached a hundred thousand messages. Not as pretty as I'd like, but it works.
@jschmidtWK It sounds like you're not using the balanced consumer (subscribe()) but assign():ing thousand of partitions directly with assign(), and since assign() is an absolute API, not incremental, each invocation will have a cost in terms of stopping fetchers, clearing out existing queues, looking up new offsets, starting fetchers again.
librdkafka's legacy consumer (which the new high-level consumer is built up-on) provides an incremental per-partition consume_start/stop() API that lets you add and delete partitions during runtime without affecting the operation of other partition fetchers.
Since this is quite a narrow use-case we don't have any plans on working to expose this API in the .NET client, but I wanted to let you know there is indeed support underneath for what you seem to want, and we would be willing to be accept a high quality PR that adds such APIs to the .NET client.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L2103
I'm actually using a Subscribe() on one topic and mirroring the Assign() manually on another (two kinds of message per device, each has its own topic with one partition per device).
Oh, ok :+1:
My apologies if I was curt, such was not my intent. Your provided answer to the prioritization of message actually allowed the system to work more than well enough for a production environment.
I'll consider creating a PR to expose the underlying Queues if the current solution creates a bottleneck down the line, but as it stands I doubt it since all I'm doing is throttling Poll().
Thanks again for the great support, this lib is great 馃憤
Most helpful comment
My apologies if I was curt, such was not my intent. Your provided answer to the prioritization of message actually allowed the system to work more than well enough for a production environment.
I'll consider creating a PR to expose the underlying Queues if the current solution creates a bottleneck down the line, but as it stands I doubt it since all I'm doing is throttling Poll().
Thanks again for the great support, this lib is great 馃憤