Kafka-node: HighLevelConsumer rebalance logic for offset not works perfectly in v0.5.1

Created on 29 Jul 2016  路  2Comments  路  Source: SOHU-Co/kafka-node

Hi,
I'm using kafka-node and thanks to v0.5.1 update which resolves this issue.
However, even after update to v0.5.1, rebalance logic seems have some problem.
My situation is,

  • Kafka server

    • 6 partiton for an topic (Kafka version is 0.9.x)

  • Kafka client(Application server)

    • 4 server, and each server has 2 node instances(using pm2) - total 8 node server instances

  • code for kafka client connection
let client = new kafka.Client(config.kafka.hosts)
let consumer = new kafka.HighLevelConsumer(client, [{
   topic: 'someTopic'
}, {
   topic: 'otherTopic'
}], {
    groupId: 'group-id-production',
    autoCommit: true,
    autoCommitIntervalMs: 3000
})

Every time I run the servers, one(or two) of my kafka partition does not consumed at all.
(I'm not sure it was happened in previous version(I used v0.3.x before update))

Is there anyone having similar issue with me? or any solution about it?

Most helpful comment

  • I think the problem was error handler logic for consumer instance in my code

    • my code was

module.exports = function (app) {
  let io = app.get('socketio')
  let client = new kafka.Client(config.kafka.hosts)
  let highLevelConsumer = initConsumer()

  function initConsumer () {
    let consumer = new kafka.HighLevelConsumer(client, [ {
      topic: 'topic1'
    }, {
      topic: 'topic2'
    } ], {
      groupId: 'consumer-group',
      autoCommit: true,
      autoCommitIntervalMs: 3000
    })

    consumer.on('message', function (message) {
      // do something
    })

    consumer.on('error', function (err) {
      setTimeout(function () {
        highLevelConsumer = initConsumer()
      }, 30000)
      consumer.close(true, function () {})
    })
    return consumer
  }

  process.on('SIGINT', function () {
    highLevelConsumer.close(true, function () {
      process.exit()
    })
  })
}
  • After debug, I found that 'NODE_EXISTS' error always occurred right after restarting server
  • and the consumer which throws the error not closed and remained in consumer list
  • So I change my codes like
module.exports = function (app) {
  let io = app.get('socketio')
  let client = new kafka.Client(config.kafka.hosts)
  let highLevelConsumer = initConsumer()
  initConsumerEventListeners()

  function initConsumer () {
    let consumer = new kafka.HighLevelConsumer(client, [{
      topic: 'topic1'
    }, {
      topic: 'topic2'
    }], {
      groupId: 'consumer-group',
      autoCommit: true,
      autoCommitIntervalMs: 3000
    })
    return consumer
  }

  function initConsumerEventListeners () {
    highLevelConsumer.on('message', function (message) {
      // do something
    })

    highLevelConsumer.on('error', function (err) {
      highLevelConsumer.close(function () {
        logger.info('Consumer closed inside error event handler')
      })
      client.close(function () {
        logger.info('Client closed inside error event handler')
      })
      setTimeout(function () {
        client = new kafka.Client(config.kafka.hosts)
        highLevelConsumer = initConsumer()
        initConsumerEventListeners()
      }, 30000)
    })
  }

  process.on('SIGINT', function () {
    highLevelConsumer.close(true, function () {
      process.exit()
    })
  })
}

Now it works well. Thanks for answering me.

All 2 comments

  • Make sure the message sizes you're trying to consume doesn't exceed the default message size (which i believe is around 1 meg). If this is the case you can adjust this by updating the fetchMaxBytes option.
  • Also make sure you are closing your consumers for the CTRL-C case with something like this:
process.on('SIGINT', function () {
    highLevelConsumer.close(true, function () {
        process.exit();
    });
});
  • Lastly there's some outstanding HLC rebalancing issues that I hope to address in 0.5.3 release.
  • I think the problem was error handler logic for consumer instance in my code

    • my code was

module.exports = function (app) {
  let io = app.get('socketio')
  let client = new kafka.Client(config.kafka.hosts)
  let highLevelConsumer = initConsumer()

  function initConsumer () {
    let consumer = new kafka.HighLevelConsumer(client, [ {
      topic: 'topic1'
    }, {
      topic: 'topic2'
    } ], {
      groupId: 'consumer-group',
      autoCommit: true,
      autoCommitIntervalMs: 3000
    })

    consumer.on('message', function (message) {
      // do something
    })

    consumer.on('error', function (err) {
      setTimeout(function () {
        highLevelConsumer = initConsumer()
      }, 30000)
      consumer.close(true, function () {})
    })
    return consumer
  }

  process.on('SIGINT', function () {
    highLevelConsumer.close(true, function () {
      process.exit()
    })
  })
}
  • After debug, I found that 'NODE_EXISTS' error always occurred right after restarting server
  • and the consumer which throws the error not closed and remained in consumer list
  • So I change my codes like
module.exports = function (app) {
  let io = app.get('socketio')
  let client = new kafka.Client(config.kafka.hosts)
  let highLevelConsumer = initConsumer()
  initConsumerEventListeners()

  function initConsumer () {
    let consumer = new kafka.HighLevelConsumer(client, [{
      topic: 'topic1'
    }, {
      topic: 'topic2'
    }], {
      groupId: 'consumer-group',
      autoCommit: true,
      autoCommitIntervalMs: 3000
    })
    return consumer
  }

  function initConsumerEventListeners () {
    highLevelConsumer.on('message', function (message) {
      // do something
    })

    highLevelConsumer.on('error', function (err) {
      highLevelConsumer.close(function () {
        logger.info('Consumer closed inside error event handler')
      })
      client.close(function () {
        logger.info('Client closed inside error event handler')
      })
      setTimeout(function () {
        client = new kafka.Client(config.kafka.hosts)
        highLevelConsumer = initConsumer()
        initConsumerEventListeners()
      }, 30000)
    })
  }

  process.on('SIGINT', function () {
    highLevelConsumer.close(true, function () {
      process.exit()
    })
  })
}

Now it works well. Thanks for answering me.

Was this page helpful?
0 / 5 - 0 ratings