Kafka-node: Error handling on message reception

Created on 19 Mar 2015  路  8Comments  路  Source: SOHU-Co/kafka-node

Sorry if this comes off as a noob-ish question but i've only recently started working with Kafka.

How would i go about an implementation that would only move the offset if the message was processed corrently?
Currently on('message') just gives me a firehose with no way to handle errors in the processing of messages.

Most helpful comment

There is no good way to handle this. We can't make sure how many messages will be respond in one fetch request, it depends on fetchMaxBytes option in consumer and the size per message. So here are several ways I can think about:

  1. Maintain a memory queue in your application code to queue the messages you get, and process then one by one. If the message is processed successfully, commit the offset with Offset.commit() api, since offset commit request is a little slow, you may not commit every message that processed. If the message is failed, then:

    • stop queue new coming message

    • pause consumer by consumer.pause()

    • set the current offset to the last successfully processed offset by consumer.setOffset

    • resume consumer by consumer.resume

    • this way can be overhead.

  2. If the message order is not necessary, you can simple send the fail message to kafka and consume it again.
    Hope it help, please put your thoughts here.

All 8 comments

There is a way.

  1. First you turn off autoCommit by setting it to false when creating a consumer.
  2. on('message') process it.
  3. When done, call consumer.commit(function callback(err, data) {});.

Easy as pie.

@kadishmal Does that work for HighLevelConsumers aswell?
I'm not sure i'm getting this right, even with a non high-level consumer it will get 10+ messages before comitting.
Is there any way to limit it to a single message a time without specifying an exact byte size?

There is no good way to handle this. We can't make sure how many messages will be respond in one fetch request, it depends on fetchMaxBytes option in consumer and the size per message. So here are several ways I can think about:

  1. Maintain a memory queue in your application code to queue the messages you get, and process then one by one. If the message is processed successfully, commit the offset with Offset.commit() api, since offset commit request is a little slow, you may not commit every message that processed. If the message is failed, then:

    • stop queue new coming message

    • pause consumer by consumer.pause()

    • set the current offset to the last successfully processed offset by consumer.setOffset

    • resume consumer by consumer.resume

    • this way can be overhead.

  2. If the message order is not necessary, you can simple send the fail message to kafka and consume it again.
    Hope it help, please put your thoughts here.

I decided to go with a in-process/memory queue. Message order is slightly important but re-processing is not an issue.

I could obviously set fetchMaxBytes lower but the messages in this topic will have variable sizes and i'm not sure how large htey will be yet.

I'm doing something like

consumer = new HighLevelConsumer(
  client,
  [
    {
      topic: 'topic'
    }
  ],
  {
      groupId: 'group',
      paused: true,
      autoCommit: false
  }
);

var firstDrain = true;
queue.drain = function() {
  if (!firstDrain) {
    console.log('all items have been processed');
    consumer.commit(function callback(err, data) {
      console.log("commited");      
    });
  }
  firstDrain = false;
};

consumer.on('message', function (message) {
  queue.push({
    stories: JSON.parse(message.value)
  });
});

setTimeout(function () {
  consumer.resume();
}, 50);

Should that work? For now i can deal with not having error handling, i just don't want more messages till i've processed this batch.

Seems ok, but why consumer should be paused when created?

@haio that was mostly debugging i suppose :)

I'll try and work with this, see how it turns out :) Thanks for the responses.

@kadishmal The commit method is not like this. Here is the codes:
(HighLevelComsumer codes is almost the same as this)

function autoCommit(force, cb) {
    if (this.committing && !force) return cb(null, 'Offset committing');

    this.committing = true;
    setTimeout(function () {
        this.committing = false;
    }.bind(this), this.options.autoCommitIntervalMs);

    var payloads = this.payloads;
    if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);

    var commits = payloads.filter(function (p) { return p.offset !== 0 });
    if (commits.length) {
        this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
    } else {
        cb(null, 'Nothing to be committed');
    }
}

Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;

Your codes is make force be true. I dont know it's right or not. Maybe it's another method?


Never mind, After I tested many times, It works. But I suggest to use commit(true, callback);

Was this page helpful?
0 / 5 - 0 ratings