Hi:
I am using Ubuntu 14.04. I just npm install kafka-node and using kafka_0.9.2-0.8.2.1. I have one nodejs and two brokers, kafka1, kafka2. The zookeeper is in kafka1. Following codes in nodejs client:(which is in node_js_machine). When I kill kafka process which nodejs currently connects. The nodejs crushes and exited. The main problem is in error handling, if I comment 4 lines as follows (problem lines), then even I kill borker, the node does not crushes. I am wondering when broker nodejs connected is down, how to catch this event and reconnect to another broker? Is my error handling correct?
var csUtil = require('cs-js-common');
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm'),
producer = new Producer(client),
km = new KeyedMessage('key', 'message');
var kafkaConnected = false;
producer.on('ready',function(){
kafkaConnected = true;
log.info("kafka producer is connected");
console.log("kafka producer is connected");
});
producer.on('error',function(err){
log.error("error in producer on error"+err);
console.log("producer error is invoked");
kafka = require('kafka-node'); ===========problem line 1
Producer = kafka.Producer; ===========problem line 2
client = new kafka.Client('kafka1:2181/','AppAnalyzerStorm'); ======problem line 3
producer = new Producer(client); ======problem line 4
});
producer.send(xxxx)
no need to declare Lines 1 & 2 again, they can be re-used.
kafka = require('kafka-node'); ===========problem line 1
Producer = kafka.Producer; ===========problem line 2
What I did for connection loss errors was setup a timer, that would re-establish connection and setup the event handlers for kafka.
For producer.on error & client.on error & client.on close.
producer.close();
client.close(); // Comment out for client on close
if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
reconnectInterval =
setTimeout(function () {
setupConnectionToKafka();
}, timeToRetryConnection);
}
Then within the setupConnectionToKafka
if ( reconnectInterval != null) {
clearInterval(reconnectInterval);
reconnectInterval = null;
}
client = new kafka.Client(zkConnectionStr, 'NodeJsProducer', zkOptions);
producer = new HighLevelProducer(client);
// re-declare your events.
You can test the re-connection by shutting down kafka or ZK or the VM box you are testing on.. then bring it back up shortly after.
Hi: I have following kafka nodejs client code as instructed: I have two kafka brokers. When the leading broker process is killed. I could see client.close event since I could see the log. But I could not see producer.ready event is invoked. I could not see the log, ("kafka is ready!!"). Any suggestions? Are there something wrong with my javascript codes?
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm'),
producer = new Producer(client),
km = new KeyedMessage('key', 'message');
var timeToRetryConnection = 12*1000; // 12 seconds
var reconnectInterval = null;
producer.on('ready',function(){
log.info("kafka is ready!!");
if(reconnectInterval!=null) {
clearTimeout(reconnectInterval);
reconnectInterval =null;
}
});
producer.on('error',function(err){
producer.close();
client.close(); // Comment out for client on close
if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
reconnectInterval =
setTimeout(function () {
log.info("reconnect is called in producer error event");
client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
producer = new Producer(client);
}, timeToRetryConnection);
}
});
client.on('error',function(err){
kafkaConnected = false;
producer.close();
client.close(); // Comment out for client on close
if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
reconnectInterval =
setTimeout(function () {
log.info("reconnect is called on client error event");
client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
producer = new Producer(client);
}, timeToRetryConnection);
}
});
client.on('close',function(err){
kafkaConnected = false;
producer.close();
if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
reconnectInterval =
setTimeout(function () {
log.info("reconnect is called on client close");
client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
producer = new Producer(client);
}, timeToRetryConnection);
}
});
I found producer.on 'ready' is never be called. But message could be sent successfully to Kafka.
What happens producer.on 'ready' event? why it is never invoked?
producer.on('ready',function(){
log.info("kafka is ready!!");
if(reconnectInterval!=null) {
clearTimeout(reconnectInterval);
reconnectInterval =null;
}
});
Hi, in following code
producer.on('error',function(err){
producer.close();
client.close(); // Comment out for client on close
if ( reconnectInterval == null) { // Multiple Error Events may fire, only set one connection retry.
reconnectInterval =
setTimeout(function () {
log.info("reconnect is called in producer error event");
client = new kafka.Client(kafkaZookeeperConnStr,'AppAnalyzerStorm')
producer = new Producer(client);
}, timeToRetryConnection);
}
});
you just create a new producer instance in error handler but it doesn't listen ready event, so it can't get ready event of course.
Same issue to me when broker get restarted.
@juhanishen Hi ~ If I new a client instance in setTimeOut call back function , seems it doesn't connect with my event listener like "consumer.on('message)" . How can I solve this ?
{ Error: connect ECONNREFUSED 104.211.246.27:9092
at Object._errnoException (util.js:1024:11)
at _exceptionWithHostPort (util.js:1046:20)
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1182:14)
code: 'ECONNREFUSED',
errno: 'ECONNREFUSED',
syscall: 'connect',
address: '104.211.246.27',
port: 9092 }
Most helpful comment
Same issue to me when broker get restarted.