I would like to perform each message commit to Kafka manually, I'm using the commit function like this :
consumer.commit({ topic: metadata.topic, partition: metadata.partition, offset: metadata.offset }, function (err, data) { ... }
Could you tell me if this is a good way of using it ?
The problem is that even if the callback is triggered without errors, if I restart the consumer, it consumes the last committed message again (restart is made by calling disconnect and then reconnect or simply calling process.exit() function).
When you commit a message, you need to add 1 to the offset. Otherwise, you will get that message back. You are committing the offset you want to READ at since it is a topic partition that the library uses behind the scenes.
In an upcoming release I will be adding a commitMessage method that will do the +1 for you, but for now that is the workaround.
Hope that helps
It works, Fine thank you for your help.
Note that if you have high throughput, it's discouraged to commit on each message - commit is not as light as reading data and will make broker spend too much time on updating offset everytime. Commit every 100ms or every 10msg for example.
Most helpful comment
When you commit a message, you need to add 1 to the offset. Otherwise, you will get that message back. You are committing the offset you want to READ at since it is a topic partition that the library uses behind the scenes.
In an upcoming release I will be adding a
commitMessagemethod that will do the +1 for you, but for now that is the workaround.Hope that helps