Hey guys,
As the title states, I was wondering if there was a convenient way to block Poll() from consuming any more messages until all the current OnMessage events are complete.
My scenario is we are pulling messages from a busy queue, enriching the messages and forwarding them through a web socket.
When the client connected to us can't receive the messages fast enough, or app keeps pulling messages into memory and we end up with a huge amount of objects (messages, async continuors etc...) in the heap and the allocater balloons the application memory usage to compensate (upwards for 2-3gb).
Ideally what we would want to do, is only continue pulling messages if we aren't waiting on any OnMessage events to complete. IE we pull x number of messages, and until they have been sent successfully by the web socket, don't pull anymore.

Here is an image of a test app I use to verify this (note, I don't enrich the messages in this app). When I receive a message I increment a received count, then wait on socket.SendAsync and after that's finished (sent), i incremented a sent count.
As you can see, midway in the application, I've pulled nearly 500k messages but only sent 100k so far, and the application's process memory is building up because of the messages we still have. Instead I'd only ever want the received messages to be, maybe 20k ahead of the sent messages (via configuration of the poll)
Hopefully this makes sense, sorry if there is an easy way around this that I haven't found yet. Let me know if you have any questions about my scenario.
Two things:
You could add a counter which would increment when you enter OnMessageand decrement when you exit it, and make your while(true)depend on this counter instead (while (nbMessageTreated < 10) for example). Mind the thread as you are going async
Internally, Pollconsume from librdkafka private queue, but it's not directly related to the way librdkafka fetch data from broker. So even if you don't call Poll, librdkafka will still fetch data, up to 1GB by default
There is an rd_kafka_pause_partitions api in librdkafka, but we don't expose it yet.
What you can do is reduce queued.max.messages.kbytes (by default 1_000_000, so 1Go in librdkafka)
so that librdkafka won't queue as long as you don't Poll to free some space in the internal queue. There are other related configuration, you can find all with their usage here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Some other things:
Anyway, there is still one issue with all those methods: if you stop polling, you won't send your commit either, or get metadata, or error, etc. We should implement the pause/resume api for this use case ( @mhowlett )
thanks for the replies @treziac :-)
I notice we're just about to add pause/resume in the go client. we should do this here too, it'll be easy.
Hey guys,
What we've decided on is to just use Consume on our end in order to achieve this. I did notice the comment on the Consume method saying to provide a reason why we'd use this instead of poll (which we've used before).
Basically our use case comes down to us wrapping the Kafka consumer in an 'Queue' interface and exposing a method to 'GetMessage', so we required the ability for another component who depends on the 'Queue' to selectively get messages.
After testing the potential performance increase in polling, storing a message in a buffer and allowing a component to pick messages out of the buffer, it seems keeping things simple and just exposing the Consume method provides just as good performance for our case (and much much less complexity)
Let me know if you want me to clarify this use case any further.
Cheers!
Thanks for the feedback @NicholasFaneDev! The Consume method has now proved it's worth in a number of scenarios and I just did a PR #295 to remove the unstable designation.
Most helpful comment
Some other things:
Consume may be handy in this case: rather than using Poll and OnMessage, simply call Consume with low timeout (say 10ms), and either when consume return null (no more data in private queue) or when your buffer reach some size, you send your data. This still combine with other propositions
Anyway, there is still one issue with all those methods: if you stop polling, you won't send your commit either, or get metadata, or error, etc. We should implement the pause/resume api for this use case ( @mhowlett )