Hi,
Consumer stops fetching after getting following error
{ Error: Local: Broker transport failure
at Error (native)
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure\n at Error (native)' }
All kafka servers are healthy even then I am getting the transport failure .
After this error consumers stops fetching further messages
The consumer will not know the new state of the brokers unless you call consume at least to keep it pumping. Kafka depends on the fetch requests to get information about the state of the brokers.
How are you consuming? Which method are you using?
@webmakersteve i am using consumer like this
var consumer = new Kafka.KafkaConsumer({
'group.id': 'calllog11',
'metadata.broker.list': process.env.KAFKA_SERVERS,
'enable.auto.commit': false,
}, { 'auto.offset.reset': process.env.AUTO_OFFSET_RESET });
consumer.connect();
consumer
.on('ready', function() {
console.log('consumer ready')
consumer.subscribe(['calllog']);
setInterval(function() {
consumer.consume(500);
}, 1000);
})
Where is that error getting logged? What block of code? You must have an error handler of some sort doing that, correct?
Closing this issue because of inactivity. Please re-open and comment if you're still experiencing the issue on the most up to date version of node-rdkafka :)
Hi
@webmakersteve I've experienced with the same problem. Consumer stops receiving messages from topic at some point. And in the log I see the only
2018-05-23T13:29:10.303Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:33:55.362Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:39:10.401Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:43:55.442Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:53:55.537Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:54:35.542Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
node v9.11.1
[email protected]
My consumer
this.consumer = new Kafka.KafkaConsumer({
'debug': 'all',
'group.id': config.kafka.consumer.group_id,
'metadata.broker.list': config.kafka.consumer.brokers,
'rebalance_cb': function(err, assignment) {
log("+OK - rebalance cb");
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
log("+OK - rebalance cb. assign: %j", assignment);
// Note: this can throw when you are disconnected. Take care and wrap it in
// a try catch if that matters to you
try {
this.assign(assignment).committed(10000, (err, data) => {
log("+OK - rebalance_cb. committed data: %j", data);
});
} catch(err) {
error("+OK - rebalance_cb. can not assign assignments err: %j", err);
}
} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
log("+OK - rebalance cb. unassign: ", assignment);
try {
// Same as above
this.unassign();
} catch(err) {
error("+OK - rebalance_cb. can not unassign assignments err: %j", err);
}
} else {
// We had a real error
error("+ERR - rebalance cb. err: ", err);
}
},
'offset_commit_cb': (err, topicPartitions) => {
if (err) {
error("+ERR - offset commit cb. err: ", err);
// There was an error committing
} else {
log("+OK - offset commit cb. topicPartitions: ", topicPartitions);
// Commit went through. Let's log the topic partitions
}
}
}, {});
//logging debug messages, if debug is enabled
this.consumer.on('event.log', ev => {
// log("+OK - initServer.event.log ", ev);
});
//logging all errors
this.consumer.on('event.error', err => {
error("+ERR - initServer.event.error err: %j", err);
});
this.consumer.on('ready', (info, metadata) => {
log('+OK - initServer.ready. info %j metadata: %j', info, metadata);
this.consumer.subscribe(config.kafka.consumer.topics);
//start consuming messages
this.consumer.consume();
if(cb) cb();
});
this.consumer.on('data', m => {
let parsed_message;
try { parsed_message = JSON.parse(m.value.toString()); } catch(e) {}
// Do some stuff with message
});
this.consumer.on('disconnected', arg => {
log('+OK - initServer.disconnected. %j', arg);
});
//starting the consumer
this.consumer.connect();
I don't see any "rebalance" or "disconnect" calls in logs and according to Kafka-Tool the consumer is still connected and subscribed on topic but doesn't receive any messages.
@affair did you solve this? we are having the same problem and we are not seeing any reason..
@gabrielo91 No, I didn't. Also I can not reproduce it on the production server.
Which version do you use? Are you able to reproduce it?
We are using 2.4.2, in our case we can't reproduce the error in local enviroment, in which all works well.. in production consumer just stop listening messages and it does't leave any even or error.....
I'm having the same problem with node-rdkafka v2.4.2 on Linux.
This is most probably a broker side problem. It may happen to you when your brokers are advertising a host that is inaccessible to the consumer (like localhost). Can you run a get metadata call with Kafkacat or node-rdkafka and put the output here?
Thanks, I'll set that up. In my case, I am consuming messages for a time before it gives up. What I have noticed is that occasionally I will see this error:
{ Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }
Most of the time the client recovers and will still consume messages, but at some point this error gets thrown and the consumer stops. I have been able to work around the issue by coding up this:
consumer.on('ready', function()
{
console.log("Consumer Ready");
consumer.subscribe([topic]);
consumer.consume();
});
consumer.on('event.error', function(err)
{
console.log(err);
consumer.disconnect();
});
consumer.on('disconnected', function(data)
{
console.log("Disconnected. Reconnecting...");
consumer.connect();
});
When I do this, the client reconnects to the broker and starts consuming the messages again.
The callback from the metadata call in node-rdkafka did not fire for me
Thanks, I'll set that up. In my case, I am consuming messages for a time before it gives up. What I have noticed is that occasionally I will see this error:
{ Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }Most of the time the client recovers and will still consume messages, but at some point this error gets thrown and the consumer stops. I have been able to work around the issue by coding up this:
consumer.on('ready', function()
{
console.log("Consumer Ready");
consumer.subscribe([topic]);
consumer.consume();
});
consumer.on('event.error', function(err)
{
console.log(err);
consumer.disconnect();
});
consumer.on('disconnected', function(data)
{
console.log("Disconnected. Reconnecting...");
consumer.connect();
});When I do this, the client reconnects to the broker and starts consuming the messages again.
@webmakersteve What do you think about this solution? And how we should handle disconnected event from the library?
You should only need to re-call the consume() call on those broker errors. This is part of why I want to get rid of that functionality.
The consume loop will consume until it gets an error, and then stop feeding the data callback. Otherwise, reconnection and disconnection should be handled entirely by the client.
@robomeister The first parameter is options, and second is the callback. Did you do it in the right order? It should fire even in cases of failure.
You should only need to re-call the
consume()call on those broker errors. This is part of why I want to get rid of that functionality.The consume loop will consume until it gets an error, and then stop feeding the data callback. Otherwise, reconnection and disconnection should be handled entirely by the client.
So, the proper way to implement of reconnecting:
consumer.on('ready', function() {
console.log("Consumer Ready");
consumer.subscribe([topic]);
consumer.consume();
});
consumer.on('event.error', function(err) {
console.log(err);
consumer.consume();
});
Is that correct?
I do eventually get an error, I wasn't being patient enough. I'll give it a try with just the call to consume. Thanks...
Hi again. Attempting a re-consume on the error didn't work for me. It has to be a full disconnect.
That's strange behavior then. It may be that the loop on the worker thread gets stuck. I'm going to classify this as a bug then, but it likely won't be resolved until flowing mode is re-implemented in JS land.
Can you open a new issue?
Hi @webmakersteve we facing the same problem here :( do we have another open issue for this or an ETA for its resolution?
Thx in advance!!
Hi @webmakersteve, we are facing the same issue in our environment.
Thanks
Hi @webmakersteve, I'm also facing the same issue. Any update on when the fix will be applied ?
Bumping this - I believe that I'm also facing this issue.
Hi, @webmakersteve I am facing the same issue while consuming messages in the production environment please help with this. I have implemented it in .net core.
Message: Worker Error while consuming the message.
"Reason":"Local: Broker transport failure
Most helpful comment
Hi @webmakersteve, we are facing the same issue in our environment.
Thanks