Hi
I have a simple producer and consumer with one instance of Zookeeper and one instance of Kafka running.
After every call to producer send() I see the following being output from the Kafka Broker:
[2014-04-18 15:42:59,165] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:498)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:50)
at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:46)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:285)
at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.Range.map(Range.scala:39)
at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:46)
at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:43)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227)
at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:285)
at scala.collection.immutable.Range$$anon$1.foreach(Range.scala:274)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227)
at scala.collection.immutable.Range.flatMap(Range.scala:39)
at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:43)
at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:45)
at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:45)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:383)
at kafka.network.Processor.run(SocketServer.scala:263)
at java.lang.Thread.run(Thread.java:744)
I'm using a build of Kafka made yesterday and the latest kafka-node pulled from npm.
This is my consumer, it listens for a message and replies to another topic:
'use strict';
var config = require('./config.json');
var service = require(config.service);
var kafka = require('kafka-node');
var client = new kafka.Client('127.0.0.1:2181');
var consumTopic = service.consumTopic;
var consumGroup = service.consumGroup;
var offset = 0;
var consumer = new kafka.Consumer(client, [{topic: consumTopic, offset: offset}], {groupId: consumGroup, fromOffset: true, autoCommit: false});
consumer.on('message', function(message) {
console.log('processing request');
var msg = JSON.parse(message.value);
var replyTopic = msg.replyTopic;
var replyPartition = msg.replyPartition;
var cmd = msg.command;
msg.reply = msg.reply || {};
service[cmd](msg, forwardMessage, sendReply(replyTopic, replyPartition));
consumer.commit();
});
consumer.on('error', function (err) {
console.log('Error:');
console.dir(err);
});
function forwardMessage(topic, message, options) {
var producer = new kafka.Producer(client);
producer.send([{topic: topic, messages: JSON.stringify(message)}], function(err, data) {
console.log('Error: ');
console.dir(err);
console.log('Data: ' + data);
});
}
function sendReply(topic, partition, reqId) {
return function(message) {
var producer = new kafka.Producer(client);
producer.send([{topic: topic, partition: partition, messages:[JSON.stringify(message)]}], function(err, data) {
console.log(data);
});
};
}
This is the client that will make the request and wait for a response
'use strict';
var uuid = require('uuid');
var express = require('express');
var app = express();
var kafka = require('kafka-node');
var consumer = new kafka.Consumer(new kafka.Client('127.0.0.1:2181/'), [{topic: 'web-res', partition: 0, autoCommit: false}], {groupId: 'web-res-grp'});
var producer = new kafka.Producer(new kafka.Client('127.0.0.1:2181/'));
var resObj;
app.post('/echo', function(req, res) {
var id = uuid.v4();
resObj = res;
var msg = {
replyTopic: 'web-res',
replyPartition: 0,
reqId: id,
command: 'echoTime'
};
producer.send([{ topic: 'test-topic', messages:[JSON.stringify(msg)]}], function(err, data) {
console.log(data);
});
});
consumer.on('message', function(message) {
var m = JSON.parse(message.value);
if(resObj) {
console.log(m.reply.time);
resObj.end(m.reply.time + ' ' + m.reply.weather);
}
});
consumer.on('error', function (err) {
console.log('Error:');
console.dir(err);
});
app.listen(3000);
Are your topics already present in Kafka when you start your Node server? I have noticed that when kafka-node first creates a topic, I get a LeaderNotAvailable error when I try to consume from it. Simply restart my node server allows the consumer to see a leader and consume properly. I'm not sure if leaders are elected on a per-topic basis. I may post an issue about this later, but I'm still looking into it.
I haven't tried creating a topic with the producer in a while.
I just realized the scenario I mentioned above was addressed in #37. Perhaps it is broken again?
close.
looks like it is broken again, facing the same issue on v0.5.0
@masumsoft what version of kafka are you connected to? Can you create a unit test that reproduces the issue?
@hyperlink
Hi, I am using version 0.5.8
{ BrokerNotAvailableError: Could not find the leader
at new BrokerNotAvailableError (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Client.sendToBroker (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/client.js:481:17)
at Client.send (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/client.js:443:10)
at Client.sendFetchRequest (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/client.js:199:8)
at Consumer.fetch (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/consumer.js:181:15)
at Immediate.<anonymous> (/home/ubuntu/message-sender/js/node_modules/kafka-node/lib/consumer.js:99:12)
at runCallback (timers.js:574:20)
at tryOnImmediate (timers.js:554:5)
at processImmediate [as _immediateCallback] (timers.js:533:5) message: 'Could not find the leader' }
Post this error, lag is being created at topics and I need to manually restart the entire process to start consuming again.
My current configuration is
"groupId": "crunner-nodejs",
"zookeeper": "zookeeper-1:2180,zookeeper-2:2180,zookeeper-3:2180",
"topics": [
{ "topic": "nodejs", "partition": 0 },
{ "topic": "nodejs", "partition": 1 },
{ "topic": "nodejs", "partition": 2 },
{ "topic": "nodejs, "partition": 3 }
],
Below is the code where i am reading consumer process.
var kafka = require("kafka-node"),
Consumer = kafka.Consumer,
Consumer = kafka.Consumer,
client = new kafka.Client(
config.zookeeper,
config.name, {
sessionTimeout: 30000,
spinDelay: 1000,
retries: 0
}),
consumer = new Consumer(client, config.topics, {
groupId: config.groupId,
autoCommit: true
});
consumer.on("message", function(message) {
try {
var msg = JSON.parse(message.value);
processKafkaMessage(msg);
var offsets = logger.getLogger("kafka", "offsets", true);
logger.info(offsets, "message: " + message.partition + "." + message.offset);
} catch (err) {
var detailed = logger.getLogger("none", "none", true);
logger.debug(detailed, "payload: " + JSON.stringify(message));
}
});
consumer.on("error", function(message) {
console.error(message);
});
process.on("SIGINT", function() {
console.log("got kill signal");
if (consumer !== undefined) {
consumer.close(true, function() {
process.exit();
});
} else {
process.exit();
}
});
It would be great if consumer is reconnects and start consuming data after some time.
Thanks,
Ravi
Are you consuming on specific partitions? Could you try with the highLevelConsumer? That's the one that's most tested against failures.
Thanks @hyperlink ,
Will try with high level consumer and let you know if problem persists.
I had this issue too with Consumer. When I moved over to HighLevelConsumer, it doesn't even appear to respect the offset and fromOffset settings. Supposedly these take the same syntax in the payloads and options areas. Here's my sample code:
const kafka = require('kafka-node'),
HighLevelConsumer = kafka.HighLevelConsumer, // Doesn't obey offset...
Consumer = kafka.Consumer, // Results in 'Could not find the leader'
Offset = kafka.Offset,
Client = kafka.Client;
let client = new Client(kafaConnectionString);
let offset = new Offset(client);
const partition = 0;
offset.fetch([{topic: topic, partition: partition, time: Date.now(), maxNum: 1}], function (err, data) {
let latestOffset = data[topic][partition][0];
let consumer = new HighLevelConsumer(
client,
[{
topic: topic,
partition: partition,
offset: latestOffset
}],
{
autoCommit: false,
fromOffset: true
}
);
consumer.on('message', (message) => {
// ... my message processing here...
});
consumer.on('error', (err) => {
console.log('error', err);
});
});
@patricknelson I would use the new ConsumerGroup if possible. HighLevelConsumer is likely to be deprecated soon by Kafka.
Unfortunately I just end up with 'Broker not available' errors. The same doesn't happen when using HighLevelConsumer, only with ConsumerGroup.
kafka-node:ConsumerGroup Connecting kafka-node-client +1m
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'kafka-server-here',
coordinatorPort: 9092,
coordinatorId: 2 } +12ms
kafka-node:ConsumerGroupRecovery RECOVERY from connect: kafka-node-client retrying in 110200 ms { [BrokerNotAvailableError: Broker not available] message: 'Broker not available' } +11ms
Actually I think this is because I'm using Kafka 0.8, so I guess ConsumerGroup won't work for me. How can I get HighLevelConsumer to offset from the latest messages instead of flooding the client with millions of messages that are a week old (when we still need that retention)?
EDIT @hyperlink I ended up fixing my issue by just resorting to using Consumer instead. I originally tried this but I didn't realize we were publishing on so many partitions, so I only had it configured for one (partition 0) and just used offset.fetch instead of offset.fetchLatestOffsets which then allowed me to get all the necessary offsets for each partition resulting in the consistent stream of messages that I needed. Sorry for the irrelevant posts here! (part of why I just edited my comment)
Most helpful comment
Actually I think this is because I'm using Kafka 0.8, so I guess
ConsumerGroupwon't work for me. How can I getHighLevelConsumerto offset from the latest messages instead of flooding the client with millions of messages that are a week old (when we still need that retention)?EDIT @hyperlink I ended up fixing my issue by just resorting to using
Consumerinstead. I originally tried this but I didn't realize we were publishing on so many partitions, so I only had it configured for one (partition 0) and just usedoffset.fetchinstead ofoffset.fetchLatestOffsetswhich then allowed me to get all the necessary offsets for each partition resulting in the consistent stream of messages that I needed. Sorry for the irrelevant posts here! (part of why I just edited my comment)