Kafka-node: Error: NotLeaderForPartition and BrokerNotAvailableError: Could not find the leader when broker goes down

Created on 6 Jan 2016  路  13Comments  路  Source: SOHU-Co/kafka-node

Hi!

I'm trying to understand kafka for a university seminar and I keep having data loss when I have a high throughput producer. It seems that although the kafka cluster rebalances itself, a few messages can't be sent via the HighLevelProducer shortly after one broker goes down (via CTRL + C). After a few seconds everything is normal again and no errors occur. These are the errors that I see regularly during the event of a broker outage:

  • Error: NotLeaderForPartition
  • BrokerNotAvailableError: Could not find the leader

I wanted to ask whether these errors are to be expected or whether the issue lies in my setup? It was my understanding that the whole point of kafka was to prevent any kind of data loss.

My setup

  • 1 ZooKeeper instance, 3 Kafka servers, 1 Kafka producer.
  • Topics with replication factor 3 and 30 partitions

For my testing purposes I'm currently streaming tweets and depending on the hashtag I receive multiple tweets per second. This is a snippet from the producer code:

twitterClient.stream('statuses/filter', {
  track: topic
}, stream => {
  stream.on('data', tweet => {
    producer.send([
      {
        topic: topic,
        messages: JSON.stringify(tweet),
        attributes: 1 // Use gzip compression
      }
    ], (error, data) => {
      if(error) {console.error(`Could not produce tweet:\n${error}`);} else {
        console.log('Produced tweet');
      }
    });
  });

And this is a sample from the output:

Produced tweet
Produced tweet
Produced tweet
# Kafka broker goes down soon ...
Produced tweet
Produced tweet
# Not too bad, no data loss
{ [Error: This socket has been ended by the other party] code: 'EPIPE' }
Produced tweet
Produced tweet

# Data loss
Could not produce tweet:
BrokerNotAvailableError: Could not find the leader

Produced tweet
Produced tweet
# Not too bad, no data loss
{ [Error: connect ECONNREFUSED 141.***.***.38:9092] // confuscated IP
  code: 'ECONNREFUSED',
  errno: 'ECONNREFUSED',
  syscall: 'connect',
  address: '141.***.***.38',
  port: 9092 }

# Data loss
Could not produce tweet:
Error: connect ECONNREFUSED 141.***.***.38:9092

# Data loss
Could not produce tweet:
BrokerNotAvailableError: Could not find the leader

Produced tweet

# Data loss
Could not produce tweet:
Error: NotLeaderForPartition

Produced tweet
Produced tweet
# Everything is normal again soon
...

If it is any help, here is the _kafka-topics --describe_ output:

Topic:big   PartitionCount:30   ReplicationFactor:3 Configs:
    Topic: big  Partition: 0    Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
    Topic: big  Partition: 1    Leader: 2   Replicas: 2,1,0 Isr: 2,0,1
    Topic: big  Partition: 2    Leader: 0   Replicas: 0,2,1 Isr: 2,0,1
    Topic: big  Partition: 3    Leader: 1   Replicas: 1,2,0 Isr: 2,0,1
    Topic: big  Partition: 4    Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: big  Partition: 5    Leader: 0   Replicas: 0,1,2 Isr: 2,0,1
    Topic: big  Partition: 6    Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
    Topic: big  Partition: 7    Leader: 2   Replicas: 2,1,0 Isr: 2,0,1
    Topic: big  Partition: 8    Leader: 0   Replicas: 0,2,1 Isr: 2,0,1
    Topic: big  Partition: 9    Leader: 1   Replicas: 1,2,0 Isr: 2,0,1
    Topic: big  Partition: 10   Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: big  Partition: 11   Leader: 0   Replicas: 0,1,2 Isr: 2,0,1
    Topic: big  Partition: 12   Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
    Topic: big  Partition: 13   Leader: 2   Replicas: 2,1,0 Isr: 2,0,1
    Topic: big  Partition: 14   Leader: 0   Replicas: 0,2,1 Isr: 2,0,1
    Topic: big  Partition: 15   Leader: 1   Replicas: 1,2,0 Isr: 2,0,1
    Topic: big  Partition: 16   Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: big  Partition: 17   Leader: 0   Replicas: 0,1,2 Isr: 2,0,1
    Topic: big  Partition: 18   Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
    Topic: big  Partition: 19   Leader: 2   Replicas: 2,1,0 Isr: 2,0,1
    Topic: big  Partition: 20   Leader: 0   Replicas: 0,2,1 Isr: 2,0,1
    Topic: big  Partition: 21   Leader: 1   Replicas: 1,2,0 Isr: 2,0,1
    Topic: big  Partition: 22   Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: big  Partition: 23   Leader: 0   Replicas: 0,1,2 Isr: 2,0,1
    Topic: big  Partition: 24   Leader: 1   Replicas: 1,0,2 Isr: 2,0,1
    Topic: big  Partition: 25   Leader: 2   Replicas: 2,1,0 Isr: 2,0,1
    Topic: big  Partition: 26   Leader: 0   Replicas: 0,2,1 Isr: 2,0,1
    Topic: big  Partition: 27   Leader: 1   Replicas: 1,2,0 Isr: 2,0,1
    Topic: big  Partition: 28   Leader: 2   Replicas: 2,0,1 Isr: 2,0,1
    Topic: big  Partition: 29   Leader: 0   Replicas: 0,1,2 Isr: 2,0,1
PRs Welcome enhancement kafka client

Most helpful comment

@hyperlink Is this not something that would make sense to be handled by this library? Seems like the interface could be greatly simplified if the library were to simply handle this issue, rather than push that logic up into the consumer?

We're seeing this in our client that we have here at Fanatics and we _think_ we've implemented a solution for now, but would love to know if it would make sense to eventually have kafka-node doing this "automagically" for us.

All 13 comments

Also see #245

@peterjuras It's not Kafka, but implementation of Producer or Consumer. You would probably want to try official Java Kafka client and see if you still see such errors. Anyways, other side is typical absence of synchronization between client & server... So, you can see errors like ECONNREFUSED because of following scenary:

  1. (client) Broker is chosen for (topic, partition).
  2. (server) You hit Ctrl + C and broker dies.
  3. (client) Producer sends request to dead socket.

And it's up to implementation of the client or end developer - how to handle such errors.

@peterjuras I'm curious if you ever got any resolution on this. I'm having the exact same issue, not even necessarily under heavy load.

Any help (from anyone, really) would be much appreciated.

Thanks!

A sampling of errors:

1474607274792 ERROR Producer: Error: This socket has been ended by the other party.
Server error: Error: read ECONNRESET | [object Object]
1474612850703 ERROR
        {"message":"Broker not available","stack":"BrokerNotAvailableError: Broker not available\n    at new BrokerNotAvailableError (/usr/local/lib/rickshaw/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:11)\n    at Socket.<anonymous> (/usr/local/lib/rickshaw/node_modules/kafka-node/lib/client.js:534:43)\n    at emitOne (events.js:96:13)\n    at Socket.emit (events.js:188:7)\n    at TCP._handle.close [as _onclose] (net.js:493:12)","isOperational":true}

@travismcchesney the client should automatically reconnect if it loses connection with the kafka broker.

It does automatically reconnect, but you have to try/catch it. You also have to send a message again after an error.

Thanks @hyperlink and @peterjuras !

Is there some way to smartly retry in this case? Meaning, I suspect I need to wait until the connection is re-established, but how can I know when that is? Do either of you have an example of a robust way to handle this case?

I currently just log the producer error, like so:

producer.on('error', e => ctx.logger.error(`Producer: ${e}.`))

Typically I will see this error print first, then I'll get an app level error of:

read ECONNRESET

And lastly, about 30 - 50 minutes after the initial producer error, I end up with:

{"message":"Broker not available","stack":"BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/usr/local/lib/rickshaw/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:11)
    at Socket.<anonymous> (/usr/local/lib/rickshaw/node_modules/kafka-node/lib/client.js:534:43)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at TCP._handle.close [as _onclose] (net.js:493:12)","isOperational":true}

The client will emit a connect or reconnect perhaps you could uses these events to retry those failed messages.

Is the issue resolved? Any solution would be appreciate

@hyperlink - I tried to catch the "BrokerNotAvailableError: Could not find the leader" using Catch node but can't do. Can you please tell me how to catch the error and resend the message to Kafka producer?

@hyperlink Is this not something that would make sense to be handled by this library? Seems like the interface could be greatly simplified if the library were to simply handle this issue, rather than push that logic up into the consumer?

We're seeing this in our client that we have here at Fanatics and we _think_ we've implemented a solution for now, but would love to know if it would make sense to eventually have kafka-node doing this "automagically" for us.

Hello @hyperlink - I am wondering if we can pass and argument forcing socket keep alive option to be used while connecting?

While debugging this kind of problem I found an instance where the node library believed the connection is fine whereas netstat did not show the expected connected socket to the broker. (thereby kind of side stepping your assertion that library will reconnect when connection is broken)

Another suspicious thing is the library uses socket.end() (which does a single sided close - sending the FIN packet on TCP connection). Has it been considered to end() and then destroy()?

@vivekpathak we can add a feature when idleConnection option is set to false will disable the idle disconnection. The broker might still disconnect you based on it's idle setting but the socket will just reconnect.

And for the initial connection we do call socket.end and .destroy in that order. But I see for idle disconnect it is not done (likely because it's triggered on end/close event). It seems harmless to call end and destroy there.

PRs are welcome!

I got "This socket has been ended by the other party" error in one of my production environments. I doubt if the kafka-node library should emit the error. My other environments with the same code have not got the error.

From the answer in https://github.com/websockets/ws/issues/704, "Typically this is sent when another application (client, browser, etc.) closes the socket when you are still sending data to it from the server".

I don't know if the kafka-node library close the socket when the kafka client produce data.

The version of kafka-node is 2.2.3.

Any ideas?

I got "This socket has been ended by the other party" error in one of my production environments.

This came from kafka-node? We pass any socket errors on to the socket_error on the KafkaClient. Haven't seen that one before.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

aamitsharma2705 picture aamitsharma2705  路  4Comments

kameshwari-suresh picture kameshwari-suresh  路  3Comments

yusufameri picture yusufameri  路  6Comments

nithjino picture nithjino  路  3Comments

ghinks picture ghinks  路  6Comments