I am committing offsets as follows , but after restarting consumer old messages are reappearing
function onMessage(data) {
console.log(data.offset)
offset.commit('calllog', [{ topic: 'calllog', partition: data.partition, offset: data.offset+1 }],
聽 聽 function(err, data) {
聽 聽 聽 聽 console.log(data)
聽 聽 });
is there something wrong the way i am committing offsets
If you are using ConsumerGroups (which you should) all you have to do, is to call consumerGroup.commit((err, data) => {} and it should commit your current offsets accordingly.
Just make sure to disable autoCommit in your config.
Please also make sure that you are NOT using different ConsumerGroup names, as the broker stores the offset information using a ConsumerGroup's name. Therefore always stick to the same ConsumerGroup whenever you start your script/service.
Hi, I tried consumerGroup.commit((err, data) => {}) , it is not working . the output is saying Commit not needed
Are you sure autoCommit is disabled?
yes
I have encountered the same issue as @chetandev and i assume it is a sync issue in the code.
What i have seen is that if autoCommit is true each message is consumed and committed as expected,
But when switching to autoCommit false, and trying to commit manually i get a message "Commit not needed" in the callback "data" argument.
With some tests i have noticed it happens when the code is structured something like
consumer.on('message', message => {
consumer.commit((error, data) => {
// Here: data == "Commit not needed"
});
});
Running this code will yield the consumption of the same message each time the consumer is up again (assuming no commits are made after this section).
By setting a timeout of 0, the commit seems to work (probably because it exists the on.message context) like so
consumer.on('message', message => {
setTimeout(() => {
consumer.commit((error, data) => {
// Here the commit will work as expected
});
}, 0);
});
Most helpful comment
I have encountered the same issue as @chetandev and i assume it is a sync issue in the code.
What i have seen is that if autoCommit is true each message is consumed and committed as expected,
But when switching to autoCommit false, and trying to commit manually i get a message "Commit not needed" in the callback "data" argument.
With some tests i have noticed it happens when the code is structured something like
Running this code will yield the consumption of the same message each time the consumer is up again (assuming no commits are made after this section).
By setting a timeout of 0, the commit seems to work (probably because it exists the on.message context) like so