const kafka = require('kafka-node');
const kafkaClient = new kafka.Client('<my-remote-url>:2181');
const kafkaProducer = new kafka.Producer(kafkaClient);
kafkaClient.on('error', function (err) {
logger.error('Kafka Client error: ', err);
});
kafkaClient.on('ready', () => console.log('ready'));
kafkaClient.on('connect', function () {
console.log('connected');
kafkaClient.loadMetadataForTopics([], function (error, results) {
if (error) {
return console.error(error);
}
console.log(results);
});
});
kafkaProducer.on('error', function (err) {
logger.error('Kafka Producer error: ', err);
});
// And then later on...
kafkaProducer.send([{
topic: 'test',
messages: ['test message']
}], function (err) {
console.log(err);
});
None of my event listeners result in anything being logged, so I'm not sure where to begin. Using env DEBUG="kafka-node:*" when running my server doesn't seem to provide any additional output.
{ BrokerNotAvailableError
at new BrokerNotAvailableError (/Users/robert/projects/my-project/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Client.loadMetadataForTopics (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:348:15)
at Client.send (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:515:10)
at /Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:220:10
at /Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:52:16
at Object.async.forEachOf.async.eachOf (/Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:236:30)
at Object.async.forEach.async.each (/Users/robert/projects/my-project/node_modules/kafka-node/node_modules/async/lib/async.js:209:22)
at Client.sendProduceRequest (/Users/robert/projects/my-project/node_modules/kafka-node/lib/client.js:218:9)
at Producer.BaseProducer.send (/Users/robert/projects/my-project/node_modules/kafka-node/lib/baseProducer.js:123:10)
at Object.<anonymous> (/Users/robert/projects/my-project/ws-handler.js:37:15)
at Module._compile (module.js:569:30)
at Object.Module._extensions..js (module.js:580:10)
at Module.load (module.js:503:32)
at tryModuleLoad (module.js:466:12)
at Function.Module._load (module.js:458:3)
at Module.require (module.js:513:17)
at require (internal/module.js:11:18)
at Object.<anonymous> (/Users/robert/projects/my-project/app.js:139:5)
at Module._compile (module.js:569:30)
at Object.Module._extensions..js (module.js:580:10)
at Module.load (module.js:503:32)
at tryModuleLoad (module.js:466:12)
at Function.Module._load (module.js:458:3)
at Function.Module.runMain (module.js:605:10)
at startup (bootstrap_node.js:158:16)
at bootstrap_node.js:575:3 message: 'Broker not available' }
And when I use kafkacat to confirm that everything is set up right:
kafkacat -b <my-remote-url>:9092 -L
Metadata for all topics (from broker 0: <my-remote-url>:9092/0):
1 brokers:
broker 0 at <my-remote-url>:9092
1 topics:
topic "test" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
It seems that everything is set up right, with 1 broker that the kafka-node client should be able to connect to. Are there any issues that you can see from what I've posted?
I should also mention that I gave it plenty of time before executing the kafkaProducer.send function; it wasn't being called immediately like my code sample shows
You are most likely not even connected to your broker.. which is why you see the BrokerNotAvailable Error when calling .send()..
As of now, kafka-node requires a zookeeper to connect to the broker - since your ready and connect events are not firing, the client is not able to connect to zk in the first place.
The zk module wont fire/log any events when its unable to connect it will just try to reconnect silently.
Using kafkacat you directly connect to the broker and omit the procedure a kafka-node has to run through.
@krystianity I also tried the branch discussed in #666 to bypass ZK, but I have the same issue. Any ideas?
Can't you use zk for now? Until #666 moves into master/production?
I can, I just have to figure out why it can't connect to zk then :/
@krystianity I got it figured out, it was just a zookeeper config change. Thanks for the help
@RobertHerhold I am having the same issues . Can you please tell me what was the config change that was made so that I can try it out?
The title and content is not matching.
Title say: On event is not getting emited by kafak-node lib...
and Body Say: Broker is not available.
Lets solve thing one by one...
1) Broker is not available.
Complete details mention at #160.
2) Producer and Consumer not emitting the ready event
Please try below code
var intCounter = 104;
setInterval(() => {
setMessageInTopic('topic'+intCounter, function(response){
echo("Producer Response"+response);
if(response){
echo("Consumer init");
getMessageFromKafak(response,function(pStrMessage){
echo ("------------------------Consumer message --------------------");
echo(pStrMessage);
});
}else{
echo("Error");
}
});
intCounter++;
},5000);
function setMessageInTopic(pStrTopicName, callback){
const kafka = require('kafka-node');
try {
const Producer = kafka.Producer;
const client = new kafka.KafkaClient({kafkaHost:"localhost:9094"});
const producer = new Producer(client);
const kafka_topic = pStrTopicName;
console.log(kafka_topic);
let payloads = [
{
topic: kafka_topic,
messages: {'name':'vipin'}
}
];
producer.on('ready', async function() {
let push_status = producer.send(payloads, (err, data) => {
if (err) {
console.log('[kafka-producer -> '+kafka_topic+']: broker update failed');
callback(false);
} else {
console.log('[kafka-producer -> '+kafka_topic+']: broker update success');
callback(pStrTopicName);
}
});
});
producer.on('error', function(err) {
console.log(err);
console.log('[kafka-producer -> '+kafka_topic+']: connection errored');
callback(false);
///throw err;
});
}
catch(e) {
console.log(e);
callback(false);
}
}
function getMessageFromKafak(pStrTopicName, callback){
try{
var kafka = require('kafka-node');
//var HighLevelProducer = kafka.HighLevelProducer;
var Consumer = kafka.Consumer;
var client = new kafka.KafkaClient({kafkaHost:"localhost:9094"});
let consumer = new Consumer(
client,
[{ topic: pStrTopicName}],
{
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: 'utf8',
fromOffset: false
}
);
echo("------------------RESPONE MESSAGE PROCESS "+pStrTopicName+" ----------------------")
consumer.on('message', function (message) {
echo("------------------RESPONE MESSAGE ----------------------")
console.log(message);
consumer.close();
client.close();
return callback(message);
}).on('error', function (message) {
echo("------------------RESPONE MESSAGE ERROR ----------------------")
console.log(message);
consumer.close();
client.close();
return callback(message);
});
}catch(e) {
console.log(e);
}
}
Hope that helps.
Thanks & Regards
Jaiswar Vipin Kumar R.
Most helpful comment
@krystianity I got it figured out, it was just a zookeeper config change. Thanks for the help