Kafka-node: How to commit manually in consumer group ?

Created on 7 Jun 2017  路  5Comments  路  Source: SOHU-Co/kafka-node

I am committing offsets as follows , but after restarting consumer old messages are reappearing

function onMessage(data) {
    console.log(data.offset)
    offset.commit('calllog', [{ topic: 'calllog', partition: data.partition, offset: data.offset+1 }],
聽 聽 function(err, data) {
聽 聽 聽 聽 console.log(data)
聽 聽 });

is there something wrong the way i am committing offsets

Most helpful comment

I have encountered the same issue as @chetandev and i assume it is a sync issue in the code.

What i have seen is that if autoCommit is true each message is consumed and committed as expected,
But when switching to autoCommit false, and trying to commit manually i get a message "Commit not needed" in the callback "data" argument.

With some tests i have noticed it happens when the code is structured something like

consumer.on('message', message => {
    consumer.commit((error, data) => {
        // Here: data == "Commit not needed"
    });
});

Running this code will yield the consumption of the same message each time the consumer is up again (assuming no commits are made after this section).

By setting a timeout of 0, the commit seems to work (probably because it exists the on.message context) like so

consumer.on('message', message => {
    setTimeout(() => {
        consumer.commit((error, data) => {
            // Here the commit will work as expected
        });
    }, 0);
});

All 5 comments

If you are using ConsumerGroups (which you should) all you have to do, is to call consumerGroup.commit((err, data) => {} and it should commit your current offsets accordingly.
Just make sure to disable autoCommit in your config.

Please also make sure that you are NOT using different ConsumerGroup names, as the broker stores the offset information using a ConsumerGroup's name. Therefore always stick to the same ConsumerGroup whenever you start your script/service.

Hi, I tried consumerGroup.commit((err, data) => {}) , it is not working . the output is saying Commit not needed

Are you sure autoCommit is disabled?

yes

I have encountered the same issue as @chetandev and i assume it is a sync issue in the code.

What i have seen is that if autoCommit is true each message is consumed and committed as expected,
But when switching to autoCommit false, and trying to commit manually i get a message "Commit not needed" in the callback "data" argument.

With some tests i have noticed it happens when the code is structured something like

consumer.on('message', message => {
    consumer.commit((error, data) => {
        // Here: data == "Commit not needed"
    });
});

Running this code will yield the consumption of the same message each time the consumer is up again (assuming no commits are made after this section).

By setting a timeout of 0, the commit seems to work (probably because it exists the on.message context) like so

consumer.on('message', message => {
    setTimeout(() => {
        consumer.commit((error, data) => {
            // Here the commit will work as expected
        });
    }, 0);
});
Was this page helpful?
0 / 5 - 0 ratings

Related issues

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

cheungwsj picture cheungwsj  路  5Comments

ashishnetworks picture ashishnetworks  路  4Comments

quorak picture quorak  路  5Comments

nithjino picture nithjino  路  3Comments