Hi,
I'm using kafka-node and thanks to v0.5.1 update which resolves this issue.
However, even after update to v0.5.1, rebalance logic seems have some problem.
My situation is,
let client = new kafka.Client(config.kafka.hosts)
let consumer = new kafka.HighLevelConsumer(client, [{
topic: 'someTopic'
}, {
topic: 'otherTopic'
}], {
groupId: 'group-id-production',
autoCommit: true,
autoCommitIntervalMs: 3000
})
Every time I run the servers, one(or two) of my kafka partition does not consumed at all.
(I'm not sure it was happened in previous version(I used v0.3.x before update))
Is there anyone having similar issue with me? or any solution about it?
fetchMaxBytes option.process.on('SIGINT', function () {
highLevelConsumer.close(true, function () {
process.exit();
});
});
0.5.3 release.module.exports = function (app) {
let io = app.get('socketio')
let client = new kafka.Client(config.kafka.hosts)
let highLevelConsumer = initConsumer()
function initConsumer () {
let consumer = new kafka.HighLevelConsumer(client, [ {
topic: 'topic1'
}, {
topic: 'topic2'
} ], {
groupId: 'consumer-group',
autoCommit: true,
autoCommitIntervalMs: 3000
})
consumer.on('message', function (message) {
// do something
})
consumer.on('error', function (err) {
setTimeout(function () {
highLevelConsumer = initConsumer()
}, 30000)
consumer.close(true, function () {})
})
return consumer
}
process.on('SIGINT', function () {
highLevelConsumer.close(true, function () {
process.exit()
})
})
}
module.exports = function (app) {
let io = app.get('socketio')
let client = new kafka.Client(config.kafka.hosts)
let highLevelConsumer = initConsumer()
initConsumerEventListeners()
function initConsumer () {
let consumer = new kafka.HighLevelConsumer(client, [{
topic: 'topic1'
}, {
topic: 'topic2'
}], {
groupId: 'consumer-group',
autoCommit: true,
autoCommitIntervalMs: 3000
})
return consumer
}
function initConsumerEventListeners () {
highLevelConsumer.on('message', function (message) {
// do something
})
highLevelConsumer.on('error', function (err) {
highLevelConsumer.close(function () {
logger.info('Consumer closed inside error event handler')
})
client.close(function () {
logger.info('Client closed inside error event handler')
})
setTimeout(function () {
client = new kafka.Client(config.kafka.hosts)
highLevelConsumer = initConsumer()
initConsumerEventListeners()
}, 30000)
})
}
process.on('SIGINT', function () {
highLevelConsumer.close(true, function () {
process.exit()
})
})
}
Now it works well. Thanks for answering me.
Most helpful comment
Now it works well. Thanks for answering me.