Kafka-node: Producer's "ready" event is not triggered

Created on 17 Sep 2017  路  4Comments  路  Source: SOHU-Co/kafka-node

I am creating a producer but "ready" event is not getting triggered after the service is up for a day. I have to restart the producer after which it works fine. This issue is happening on production, so very crucial for me.

Environment

  • Node version: 4.2.0
  • Kafka-node version: 1.6.2
  • Kafka version: 10.2.1

For specific cases also provide

  • Number of Brokers: 2
  • Number partitions for topic: 2

Include Sample Code to reproduce behavior

var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var client = kafka.Client(config.zookeeper.host);

var options = {};

options.requireAcks = 1
options.ackTimeoutMs = 100
options.partitionerType = 1

producer = new HighLevelProducer(client, options);

producer.on("ready", function(err, response) {
       console.log("Producer is ready");
       client.refreshMetadata(["test"], function(err3) {
            if (!err3) {
                        producer.send(payload, function(err2, result) {
                        }
            }
       }
}
producer.on("error", function (err) {
       console.log("Error in initialization");
});

Ready event is not getting fired becuase of which the message is never pushed to kafka broker.

Thanks for your contribution!

Most helpful comment

Kafka is phasing out dependency on Zookeeper at least from their client interfaces. You could try using the new KafkaClient instead which does not use zookeeper to discover the kafka cluster.

All 4 comments

ready event is only triggered from the producer.connect() method and by default it's called in the constructor of the Producer.

Same happening for me in a evaluation / performance test that i execute against kafka... This block of code succeeds around 60...70 times then the 'ready' event does not get triggered anymore.

`
constructor(params) {
this.id = params.id
this.msgCount = params.msgCount,
this.zookeeperHost = params.zookeeperHost
this.maxParallelInserts = 3
this.msgSize = params.msgSize

    this.lastPulledEvent,
    this.defaultStats = {
        inserted: 0
    }
    this.msgId = 0
    this.stats = _.cloneDeep(this.defaultStats)
    this.client = new kafka.Client(this.zookeeperHost)
    this.producer = new Producer(this.client)
}

start(cb) {
    let self = this
    console.log(this.id, 'preparing producer')
    this.producer.on('ready', () => {
        console.log(this.id, 'starting flow')
        this.runFlow(cb)
    })
    this.producer.on('error', (err) => {
        console.log('producer error', err);
    })
}`

Log output, from the point where it gets stalled

59 'preparing producer' worker 59 started { inserted: 2000 } 59 'starting flow' 59 'sending messages' 57 'messages sent, ' null { pipeTest: { '0': 1756000 } } finished 57 undefined 60 'preparing producer' worker 60 started { inserted: 1000 } 58 'messages sent, ' null { pipeTest: { '0': 1757000 } } finished 58 undefined 61 'preparing producer' worker 61 started { inserted: 1000 } 59 'messages sent, ' null { pipeTest: { '0': 1758000 } } finished 59 undefined 62 'preparing producer' worker 62 started { inserted: 1000 }

from there onwards it's just silence.. the flow never gets triggered...

I got this resolved by closing the producer and connection after the payload is sent.

Try this code snippet.

var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var client = kafka.Client(config.zookeeper.host);

var options = {};

options.requireAcks = 1
options.ackTimeoutMs = 100
options.partitionerType = 1

producer = new HighLevelProducer(client, options);

producer.on("ready", function(err, response) {
       console.log("Producer is ready");
       client.refreshMetadata(["test"], function(err3) {
            if (!err3) {
                        producer.send(payload, function(err2, result) {
                                  producer.close();
                                  client.close();
                        }
            }
       }
}
producer.on("error", function (err) {
       console.log("Error in initialization");
});

I guess the reason is zookeeper allowed only 60 producer connections which if not closed are remain active and thus results in blocker for the new connections.

This thing should be added in the documentation.

Kafka is phasing out dependency on Zookeeper at least from their client interfaces. You could try using the new KafkaClient instead which does not use zookeeper to discover the kafka cluster.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

yusufameri picture yusufameri  路  6Comments

chetandev picture chetandev  路  5Comments

comrat picture comrat  路  5Comments

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

aamitsharma2705 picture aamitsharma2705  路  4Comments