I am creating a producer but "ready" event is not getting triggered after the service is up for a day. I have to restart the producer after which it works fine. This issue is happening on production, so very crucial for me.
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var client = kafka.Client(config.zookeeper.host);
var options = {};
options.requireAcks = 1
options.ackTimeoutMs = 100
options.partitionerType = 1
producer = new HighLevelProducer(client, options);
producer.on("ready", function(err, response) {
console.log("Producer is ready");
client.refreshMetadata(["test"], function(err3) {
if (!err3) {
producer.send(payload, function(err2, result) {
}
}
}
}
producer.on("error", function (err) {
console.log("Error in initialization");
});
Ready event is not getting fired becuase of which the message is never pushed to kafka broker.
Thanks for your contribution!
ready event is only triggered from the producer.connect() method and by default it's called in the constructor of the Producer.
Same happening for me in a evaluation / performance test that i execute against kafka... This block of code succeeds around 60...70 times then the 'ready' event does not get triggered anymore.
`
constructor(params) {
this.id = params.id
this.msgCount = params.msgCount,
this.zookeeperHost = params.zookeeperHost
this.maxParallelInserts = 3
this.msgSize = params.msgSize
this.lastPulledEvent,
this.defaultStats = {
inserted: 0
}
this.msgId = 0
this.stats = _.cloneDeep(this.defaultStats)
this.client = new kafka.Client(this.zookeeperHost)
this.producer = new Producer(this.client)
}
start(cb) {
let self = this
console.log(this.id, 'preparing producer')
this.producer.on('ready', () => {
console.log(this.id, 'starting flow')
this.runFlow(cb)
})
this.producer.on('error', (err) => {
console.log('producer error', err);
})
}`
Log output, from the point where it gets stalled
59 'preparing producer'
worker 59 started
{ inserted: 2000 }
59 'starting flow'
59 'sending messages'
57 'messages sent, ' null { pipeTest: { '0': 1756000 } }
finished 57 undefined
60 'preparing producer'
worker 60 started
{ inserted: 1000 }
58 'messages sent, ' null { pipeTest: { '0': 1757000 } }
finished 58 undefined
61 'preparing producer'
worker 61 started
{ inserted: 1000 }
59 'messages sent, ' null { pipeTest: { '0': 1758000 } }
finished 59 undefined
62 'preparing producer'
worker 62 started
{ inserted: 1000 }
from there onwards it's just silence.. the flow never gets triggered...
I got this resolved by closing the producer and connection after the payload is sent.
Try this code snippet.
var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var client = kafka.Client(config.zookeeper.host);
var options = {};
options.requireAcks = 1
options.ackTimeoutMs = 100
options.partitionerType = 1
producer = new HighLevelProducer(client, options);
producer.on("ready", function(err, response) {
console.log("Producer is ready");
client.refreshMetadata(["test"], function(err3) {
if (!err3) {
producer.send(payload, function(err2, result) {
producer.close();
client.close();
}
}
}
}
producer.on("error", function (err) {
console.log("Error in initialization");
});
I guess the reason is zookeeper allowed only 60 producer connections which if not closed are remain active and thus results in blocker for the new connections.
This thing should be added in the documentation.
Kafka is phasing out dependency on Zookeeper at least from their client interfaces. You could try using the new KafkaClient instead which does not use zookeeper to discover the kafka cluster.
Most helpful comment
Kafka is phasing out dependency on Zookeeper at least from their client interfaces. You could try using the new
KafkaClientinstead which does not use zookeeper to discover the kafka cluster.