Kafka-node: How to keep nodejs connection when broker is down?

Created on 27 May 2015  路  7Comments  路  Source: SOHU-Co/kafka-node

Hi:

I am using Ubuntu 14.04. I just npm install kafka-node and using kafka_0.9.2-0.8.2.1. I have one nodejs and two brokers, kafka1, kafka2. The zookeeper is in kafka1. Following codes in nodejs client:(which is in node_js_machine). When I kill kafka process which nodejs currently connects. The nodejs crushes and exited. The main problem is in error handling, if I comment 4 lines as follows (problem lines), then even I kill borker, the node does not crushes. I am wondering when broker nodejs connected is down, how to catch this event and reconnect to another broker? Is my error handling correct?

var csUtil        = require('cs-js-common');
var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm'),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message');
var kafkaConnected = false;

producer.on('ready',function(){
    kafkaConnected = true;
    log.info("kafka producer is connected");
     console.log("kafka producer is connected");
});


producer.on('error',function(err){
    log.error("error in producer on error"+err);
    console.log("producer error is invoked");

    kafka = require('kafka-node');                                             ===========problem line 1                                      
    Producer = kafka.Producer;                                                ===========problem line 2 
    client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm');  ======problem line 3
    producer = new Producer(client);                                                          ======problem line 4
});


producer.send(xxxx) 

Most helpful comment

Same issue to me when broker get restarted.

All 7 comments

no need to declare Lines 1 & 2 again, they can be re-used.

kafka = require('kafka-node');                                             ===========problem line 1                                      
    Producer = kafka.Producer;                                                ===========problem line 2 

What I did for connection loss errors was setup a timer, that would re-establish connection and setup the event handlers for kafka.

For producer.on error & client.on error & client.on close.

producer.close();
        client.close(); // Comment out for client on close
        if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
            reconnectInterval =
            setTimeout(function () {
                setupConnectionToKafka();
            }, timeToRetryConnection);
        }

Then within the setupConnectionToKafka

if ( reconnectInterval != null) {
        clearInterval(reconnectInterval);
        reconnectInterval = null;
    }
    client = new kafka.Client(zkConnectionStr, 'NodeJsProducer', zkOptions);
    producer = new HighLevelProducer(client);
    // re-declare your events.

You can test the re-connection by shutting down kafka or ZK or the VM box you are testing on.. then bring it back up shortly after.

Hi: I have following kafka nodejs client code as instructed: I have two kafka brokers. When the leading broker process is killed. I could see client.close event since I could see the log. But I could not see producer.ready event is invoked. I could not see the log, ("kafka is ready!!"). Any suggestions? Are there something wrong with my javascript codes?

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm'),
    producer = new Producer(client),
    km = new KeyedMessage('key', 'message');
var timeToRetryConnection = 12*1000; // 12 seconds
var reconnectInterval = null;

producer.on('ready',function(){
    log.info("kafka is ready!!");
    if(reconnectInterval!=null) {
      clearTimeout(reconnectInterval);
      reconnectInterval =null;
    }
});

producer.on('error',function(err){
        producer.close();
        client.close(); // Comment out for client on close
        if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
            reconnectInterval =
            setTimeout(function () {
                    log.info("reconnect is called in producer error event");
                    client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
                    producer = new Producer(client);
            }, timeToRetryConnection);
        }
});

client.on('error',function(err){
        kafkaConnected = false;
        producer.close();
        client.close(); // Comment out for client on close
        if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
            reconnectInterval =
            setTimeout(function () {
                   log.info("reconnect is called on client error event");
                  client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
                  producer = new Producer(client);
            }, timeToRetryConnection);
        }
});

client.on('close',function(err){
        kafkaConnected = false;
        producer.close();
        if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
            reconnectInterval =
            setTimeout(function () {
                    log.info("reconnect is called on client close");
                    client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
                    producer = new Producer(client);
            }, timeToRetryConnection);
        }

});

I found producer.on 'ready' is never be called. But message could be sent successfully to Kafka.
What happens producer.on 'ready' event? why it is never invoked?

producer.on('ready',function(){
    log.info("kafka is ready!!");
    if(reconnectInterval!=null) {
      clearTimeout(reconnectInterval);
      reconnectInterval =null;
    }
});

Hi, in following code

producer.on('error',function(err){
        producer.close();
        client.close(); // Comment out for client on close
        if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
            reconnectInterval =
            setTimeout(function () {
                    log.info("reconnect is called in producer error event");
                    client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
                    producer = new Producer(client);
            }, timeToRetryConnection);
        }
});

you just create a new producer instance in error handler but it doesn't listen ready event, so it can't get ready event of course.

Same issue to me when broker get restarted.

@juhanishen Hi ~ If I new a client instance in setTimeOut call back function , seems it doesn't connect with my event listener like "consumer.on('message)" . How can I solve this ?

{ Error: connect ECONNREFUSED 104.211.246.27:9092
at Object._errnoException (util.js:1024:11)
at _exceptionWithHostPort (util.js:1046:20)
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1182:14)
code: 'ECONNREFUSED',
errno: 'ECONNREFUSED',
syscall: 'connect',
address: '104.211.246.27',
port: 9092 }

Was this page helpful?
0 / 5 - 0 ratings

Related issues

rahul-sr picture rahul-sr  路  7Comments

yusufameri picture yusufameri  路  6Comments

cesaraugustogarcia picture cesaraugustogarcia  路  3Comments

ghinks picture ghinks  路  6Comments

comrat picture comrat  路  5Comments