Node-rdkafka: Consumer stops fetching messages

Created on 30 May 2017  路  24Comments  路  Source: Blizzard/node-rdkafka

Hi,

Consumer stops fetching after getting following error
{ Error: Local: Broker transport failure at Error (native) origin: 'local', message: 'broker transport failure', code: -1, errno: -1, stack: 'Error: Local: Broker transport failure\n at Error (native)' }

All kafka servers are healthy even then I am getting the transport failure .
After this error consumers stops fetching further messages

Most helpful comment

Hi @webmakersteve, we are facing the same issue in our environment.
Thanks

All 24 comments

The consumer will not know the new state of the brokers unless you call consume at least to keep it pumping. Kafka depends on the fetch requests to get information about the state of the brokers.

How are you consuming? Which method are you using?

@webmakersteve i am using consumer like this

var consumer = new Kafka.KafkaConsumer({
'group.id': 'calllog11',
'metadata.broker.list': process.env.KAFKA_SERVERS,
'enable.auto.commit': false,

}, { 'auto.offset.reset': process.env.AUTO_OFFSET_RESET });

consumer.connect();

consumer
.on('ready', function() {
console.log('consumer ready')
consumer.subscribe(['calllog']);

    setInterval(function() {
        consumer.consume(500);
    }, 1000);
})

Where is that error getting logged? What block of code? You must have an error handler of some sort doing that, correct?

Closing this issue because of inactivity. Please re-open and comment if you're still experiencing the issue on the most up to date version of node-rdkafka :)

Hi

@webmakersteve I've experienced with the same problem. Consumer stops receiving messages from topic at some point. And in the log I see the only

2018-05-23T13:29:10.303Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:33:55.362Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:39:10.401Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:43:55.442Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:53:55.537Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}
2018-05-23T13:54:35.542Z Pid: 26748 KafkaProducer:error +ERR - initServer.event.error err: {"origin":"local","message":"broker transport failure","code":-1,"errno":-1,"stack":"Error: Local: Broker transport failure"}

node v9.11.1
[email protected]

My consumer

        this.consumer = new Kafka.KafkaConsumer({
            'debug':                    'all',
            'group.id':                 config.kafka.consumer.group_id,
            'metadata.broker.list':     config.kafka.consumer.brokers,
            'rebalance_cb': function(err, assignment) {
                log("+OK - rebalance cb");

                if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
                    log("+OK - rebalance cb. assign: %j", assignment);
                    // Note: this can throw when you are disconnected. Take care and wrap it in
                    // a try catch if that matters to you

                    try {
                        this.assign(assignment).committed(10000, (err, data) => {
                            log("+OK - rebalance_cb. committed data: %j", data);
                        });
                    } catch(err) {
                        error("+OK - rebalance_cb. can not assign assignments err: %j", err);
                    }

                } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
                    log("+OK - rebalance cb. unassign: ", assignment);

                    try {
                        // Same as above
                        this.unassign();
                    } catch(err) {
                        error("+OK - rebalance_cb. can not unassign assignments err: %j", err);
                    }
                } else {
                    // We had a real error
                    error("+ERR - rebalance cb. err: ", err);
                }

            },
            'offset_commit_cb': (err, topicPartitions) => {

                if (err) {
                    error("+ERR - offset commit cb. err: ", err);
                    // There was an error committing
                } else {
                    log("+OK - offset commit cb. topicPartitions: ", topicPartitions);
                    // Commit went through. Let's log the topic partitions
                }
            }
        }, {});

        //logging debug messages, if debug is enabled
        this.consumer.on('event.log', ev => {
            // log("+OK - initServer.event.log ", ev);
        });

        //logging all errors
        this.consumer.on('event.error', err => {
            error("+ERR - initServer.event.error err: %j", err);
        });

        this.consumer.on('ready', (info, metadata) => {
            log('+OK - initServer.ready. info %j metadata: %j', info, metadata);

            this.consumer.subscribe(config.kafka.consumer.topics);

            //start consuming messages
            this.consumer.consume();

            if(cb) cb();
        });

        this.consumer.on('data', m => {

            let parsed_message;

            try { parsed_message = JSON.parse(m.value.toString()); } catch(e) {}

            // Do some stuff with message
        });


        this.consumer.on('disconnected', arg => {
            log('+OK - initServer.disconnected. %j', arg);
        });

        //starting the consumer
        this.consumer.connect();

I don't see any "rebalance" or "disconnect" calls in logs and according to Kafka-Tool the consumer is still connected and subscribed on topic but doesn't receive any messages.

@affair did you solve this? we are having the same problem and we are not seeing any reason..

@gabrielo91 No, I didn't. Also I can not reproduce it on the production server.
Which version do you use? Are you able to reproduce it?

We are using 2.4.2, in our case we can't reproduce the error in local enviroment, in which all works well.. in production consumer just stop listening messages and it does't leave any even or error.....

I'm having the same problem with node-rdkafka v2.4.2 on Linux.

This is most probably a broker side problem. It may happen to you when your brokers are advertising a host that is inaccessible to the consumer (like localhost). Can you run a get metadata call with Kafkacat or node-rdkafka and put the output here?

Thanks, I'll set that up. In my case, I am consuming messages for a time before it gives up. What I have noticed is that occasionally I will see this error:

{ Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }

Most of the time the client recovers and will still consume messages, but at some point this error gets thrown and the consumer stops. I have been able to work around the issue by coding up this:

consumer.on('ready', function()
{
console.log("Consumer Ready");
consumer.subscribe([topic]);
consumer.consume();
});
consumer.on('event.error', function(err)
{
console.log(err);
consumer.disconnect();
});
consumer.on('disconnected', function(data)
{
console.log("Disconnected. Reconnecting...");
consumer.connect();
});

When I do this, the client reconnects to the broker and starts consuming the messages again.

The callback from the metadata call in node-rdkafka did not fire for me

Thanks, I'll set that up. In my case, I am consuming messages for a time before it gives up. What I have noticed is that occasionally I will see this error:

{ Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }

Most of the time the client recovers and will still consume messages, but at some point this error gets thrown and the consumer stops. I have been able to work around the issue by coding up this:

consumer.on('ready', function()
{
console.log("Consumer Ready");
consumer.subscribe([topic]);
consumer.consume();
});
consumer.on('event.error', function(err)
{
console.log(err);
consumer.disconnect();
});
consumer.on('disconnected', function(data)
{
console.log("Disconnected. Reconnecting...");
consumer.connect();
});

When I do this, the client reconnects to the broker and starts consuming the messages again.

@webmakersteve What do you think about this solution? And how we should handle disconnected event from the library?

You should only need to re-call the consume() call on those broker errors. This is part of why I want to get rid of that functionality.

The consume loop will consume until it gets an error, and then stop feeding the data callback. Otherwise, reconnection and disconnection should be handled entirely by the client.

@robomeister The first parameter is options, and second is the callback. Did you do it in the right order? It should fire even in cases of failure.

You should only need to re-call the consume() call on those broker errors. This is part of why I want to get rid of that functionality.

The consume loop will consume until it gets an error, and then stop feeding the data callback. Otherwise, reconnection and disconnection should be handled entirely by the client.

So, the proper way to implement of reconnecting:

consumer.on('ready', function() {
    console.log("Consumer Ready");
    consumer.subscribe([topic]);
    consumer.consume();
});

consumer.on('event.error', function(err) {
    console.log(err);
    consumer.consume();
});

Is that correct?

I do eventually get an error, I wasn't being patient enough. I'll give it a try with just the call to consume. Thanks...

Hi again. Attempting a re-consume on the error didn't work for me. It has to be a full disconnect.

That's strange behavior then. It may be that the loop on the worker thread gets stuck. I'm going to classify this as a bug then, but it likely won't be resolved until flowing mode is re-implemented in JS land.

Can you open a new issue?

Hi @webmakersteve we facing the same problem here :( do we have another open issue for this or an ETA for its resolution?
Thx in advance!!

Hi @webmakersteve, we are facing the same issue in our environment.
Thanks

Hi @webmakersteve, I'm also facing the same issue. Any update on when the fix will be applied ?

Bumping this - I believe that I'm also facing this issue.

Hi, @webmakersteve I am facing the same issue while consuming messages in the production environment please help with this. I have implemented it in .net core.

Message: Worker Error while consuming the message.
"Reason":"Local: Broker transport failure

Was this page helpful?
0 / 5 - 0 ratings

Related issues

idangozlan picture idangozlan  路  3Comments

8lueberry picture 8lueberry  路  5Comments

ivomirra picture ivomirra  路  3Comments

meierval picture meierval  路  4Comments

bigclap picture bigclap  路  5Comments