Sorry if this comes off as a noob-ish question but i've only recently started working with Kafka.
How would i go about an implementation that would only move the offset if the message was processed corrently?
Currently on('message') just gives me a firehose with no way to handle errors in the processing of messages.
There is a way.
autoCommit by setting it to false when creating a consumer.on('message') process it.consumer.commit(function callback(err, data) {});.Easy as pie.
@kadishmal Does that work for HighLevelConsumers aswell?
I'm not sure i'm getting this right, even with a non high-level consumer it will get 10+ messages before comitting.
Is there any way to limit it to a single message a time without specifying an exact byte size?
There is no good way to handle this. We can't make sure how many messages will be respond in one fetch request, it depends on fetchMaxBytes option in consumer and the size per message. So here are several ways I can think about:
Offset.commit() api, since offset commit request is a little slow, you may not commit every message that processed. If the message is failed, then:consumer.pause()consumer.setOffsetconsumer.resumeI decided to go with a in-process/memory queue. Message order is slightly important but re-processing is not an issue.
I could obviously set fetchMaxBytes lower but the messages in this topic will have variable sizes and i'm not sure how large htey will be yet.
I'm doing something like
consumer = new HighLevelConsumer(
client,
[
{
topic: 'topic'
}
],
{
groupId: 'group',
paused: true,
autoCommit: false
}
);
var firstDrain = true;
queue.drain = function() {
if (!firstDrain) {
console.log('all items have been processed');
consumer.commit(function callback(err, data) {
console.log("commited");
});
}
firstDrain = false;
};
consumer.on('message', function (message) {
queue.push({
stories: JSON.parse(message.value)
});
});
setTimeout(function () {
consumer.resume();
}, 50);
Should that work? For now i can deal with not having error handling, i just don't want more messages till i've processed this batch.
Seems ok, but why consumer should be paused when created?
@haio that was mostly debugging i suppose :)
I'll try and work with this, see how it turns out :) Thanks for the responses.
@kadishmal The commit method is not like this. Here is the codes:
(HighLevelComsumer codes is almost the same as this)
function autoCommit(force, cb) {
if (this.committing && !force) return cb(null, 'Offset committing');
this.committing = true;
setTimeout(function () {
this.committing = false;
}.bind(this), this.options.autoCommitIntervalMs);
var payloads = this.payloads;
if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);
var commits = payloads.filter(function (p) { return p.offset !== 0 });
if (commits.length) {
this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
} else {
cb(null, 'Nothing to be committed');
}
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;
Your codes is make force be true. I dont know it's right or not. Maybe it's another method?
Never mind, After I tested many times, It works. But I suggest to use commit(true, callback);
Most helpful comment
There is no good way to handle this. We can't make sure how many messages will be respond in one fetch request, it depends on
fetchMaxBytesoption in consumer and the size per message. So here are several ways I can think about:Offset.commit()api, since offset commit request is a little slow, you may not commit every message that processed. If the message is failed, then:consumer.pause()consumer.setOffsetconsumer.resumeHope it help, please put your thoughts here.