Looking at a way to process messages one at a time by pausing fetch cycle, loading an async queue and calling the back end before committing the message offset. I'm using a ConsumerGroup and offset.commit gives a successful response but restarting the node process fetches all the committed messages.
` var consumerOptions = {
host: 'host_name',
groupId: 'consumer_group_id',
sessionTimeout: 16000,
protocol: ["roundrobin"],
encoding: "buffer",
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 1024,
autoCommit: false
};`
`
myConsumer = new ConsumerGroup(Object.assign({id: "consumer_id"}, consumerOptions), "my_topic");
var kafka = require('kafka-node');
var myConsumerClient = myConsumer.client;
myConsumerOffset = new kafka.Offset(myConsumerClient);
`
`myConsumerOffset.commit('consumer_group_id', [
{ topic: 'my_topic', partition: 0, offset: current_offset }
], function (err, data) {
if(err)
console.log("error: "+err);
if(data)
console.log(data);
console.log("Message with offset "+current_offset+" commited");
});`
INFO: - Fetched message with offset: 6804
INFO: - Fetched message with offset: 6805
INFO: - Fetched message with offset: 6806
INFO: - Fetched message with offset: 6807
Backend called
{ 'my_topic': { partition: 0, errorCode: 0 } }
Message with offset 6804 commited
Backend called
{ 'my_topic': { partition: 0, errorCode: 0 } }
Backend called
Message with offset 6805 commited
{ 'my_topic': { partition: 0, errorCode: 0 } }
Backend called
Message with offset 6806 commited
{ 'my_topic': { partition: 0, errorCode: 0 } }
Backend called
Message with offset 6807 commited
I restart the process at this point and see that offset 6804 to 6807 are again fetched. I expected them to each be individually committed but not the case
Unfortunately the Offset class commit will only work with the old zookeeper based groups and not the new kafka based consumer group.
I would recommend the ConsumerGroupStream If you would like to automatically commit as messages are read.
@hyperlink Thanks for responding to this.
Is there any API in ConsumerGroupStream for managing offsets manually? I would like to commit only those offsets which I've confirmed have successfully finished processing.
EDIT: After digging through the source, I discovered all I needed to do was call consumerGroup.commit(message) after processing, where message contains the properties topic, partition, and offset. The commits are still queued as normal and uses the autoCommitIntervalMs option to determine the interval timing. I hope this helps someone.
Does autoCommit work with the ConsumerGroup? The example by @muttal implies yes, the response by @hyperlink implies no.
By default, does it commit or not?
And this says that ConsumerGroup does commit:
ConsumerGroupStream manages auto commits differently than ConsumerGroup. Whereas the ConsumerGroup would automatically commit offsets of fetched messages the ConsumerGroupStream will only commit offsets of consumed messages from the stream buffer.
That says to me that a ConsumerGroup should be auto-committing?
@deitch yes both ConsumerGroupStream and ConsumerGroup will auto commit for you. The ConsumerGroup commits messages received in the fetch cycle while and the stream version will commit what is "consumed" or popped off the buffer. Hope that helps.
It does indeed, thank you.
Hey @kryptx, thank you for the tip about managing offsets manually, it saved me some time digging the code to find that.
I think it's a good idea to update the README/docs with that, as this is not clear in the docs.
Edit: I found it on README: https://github.com/SOHU-Co/kafka-node#commitmessage-force-callback
Is there no way of marking an offset as "processed"? In the ConsumerGroupStream example you will commit every "consumed" message (in other words, every message that has been popped of the buffer). However, the processing of that message may fail _after_ consumption. Is there no way to mark a message as processed and then have the ConsumerGroupStream commiting the offset of the min consumed and processed message but ignore higher offsets if they're not yet processed?
In other words, lets imagine our buffer contains offset 1,2 and 4. And then we pop 1, 2 and 3. We then finish processing 1 which marks it as processed. The autocommit will now only commit the offset 1. Next interval, 2 and 4 are processed, and autocommit will commit offset 4.
@pontusarfwedson I have considered making the ConsumerGroupStream writable in the future and then you can just write your messages to commit them. To mark it as proceeded you should be able to disable autoCommit and just manually call commit method passing the original message.
Most helpful comment
@hyperlink Thanks for responding to this.
Is there any API in
ConsumerGroupStreamfor managing offsets manually? I would like to commit only those offsets which I've confirmed have successfully finished processing.EDIT: After digging through the source, I discovered all I needed to do was call
consumerGroup.commit(message)after processing, wheremessagecontains the propertiestopic,partition, andoffset. The commits are still queued as normal and uses theautoCommitIntervalMsoption to determine the interval timing. I hope this helps someone.