Is there any way for Kafka Consumer to receive the messages in batches of fixed count?
The kafka-node provides fetchMaxBytes option, but we want a count option because size can mostly vary in our case.
I want to perform asynchronous operation over 'n' messages at a time, then fetch another 'n' messages. Please suggest any way!
Thanks!
This is not something kafka-node supports. Is this a feature supported by the Java version?
You can achieve something similar by immediately pause the consumer after fetching then resume after processing your messages but afaik there's no way to fetch a specific n number of messages.
Thanks @hyperlink for the quick response. I am not very sure for the Java versions.
Yes, I think pause and resume in autoCommit false mode seems the only way for batch of async operations.
I can tune the fetch loop message max count by setting fetchMaxBytes(limit so that I don't receive very large no of messages), and count the messages as retrieved. Once I get the count 'n' required no of message count, I should pause the consumer, then process the messages and then manually commit offset to the offset of the last message processed. Then I can resume the Consumer,so that I start getting the messages from the next offset to be processed and start processing for the next batch. This approach may solve the problem.
Any suggestions for a better approach is greatly appreciable!
Since a batch of messages are retrieved per fetch you should call pause right after receiving the first message. There is a strong possibility (if they are being processed asynchronously) other fetches could occur during the time you are processing incoming messages. message events are emitted synchronously.
@hyperlink There is no need to pause the consumer after receiving the first message. Correct me if I am missing anything.
This is the approach:
We can continue listening the message until our desired count.
Suppose we have to process 100 messages at a time and the offset starts from 0. Then once our count reaches 100, then we can store the offset of the 100th message and pause the consumer.Even though we receive other messages, we are not going to process them.
Then once these 100 messages are processed, we can set the offset of the consumer to 101(the stored offset+1 on last pause) and resume the consumer.
Then we start receiving the new messages from 101th offset and so on.
Only I need to take care of the partition while setting the offsets.
Thanks!
@vnsrahul1304 yes your approach will work if message processing is performed synchronously otherwise what I have mentioned previously applies.
You can take a look at concurrency recommendation here. There are other abstraction written to deal with the same issue such as sinek, though I haven't tested it so ymmv. Good luck!
@hyperlink The above solutions seem to work well.
Thanks a lot!
You can close this question now.
This is not something kafka-node supports. Is this a feature supported by the Java version?
You can achieve something similar by immediately pause the consumer after fetching then resume after processing your messages but afaik there's no way to fetch a specific n number of messages.
how to approach to fetch batch of data in kafka node?how to control the flow in consumer side?
whenever consumer.on("message",fun(message){console.log(message)}) is calling.whole data from kafka is getting emitted.I need data in batches of particular size.How to fix batch size with fetchMaxBytes? in particular interval of time and how to fix timeinterval??
please suggest me.
Most helpful comment
Since a batch of messages are retrieved per fetch you should call pause right after receiving the first message. There is a strong possibility (if they are being processed asynchronously) other fetches could occur during the time you are processing incoming messages.
messageevents are emitted synchronously.