Kafka-node: ConsumerGroup reconnect problem on kafka broker restart

Created on 15 Oct 2017  路  5Comments  路  Source: SOHU-Co/kafka-node

Bug Report

If one of brokers crashes / shutdown / ctrl+c, kafka client fails to reconnect. Probably issue happens when:

kafka-node:Client refresh metadata error undefined +3ms

After this line TimeoutError: Request timed out after 30000ms is thrown to event handler. ConsumerGroup is not recovered, and node process needs to be restarted.

Environment

  • Node version: 7.8.0
  • Kafka-node version: 2.2.3
  • Kafka version: 0.11.0.x

For specific cases also provide

  • Number of Brokers: 2 / 3
  • Number partitions for topic: 8 / 18

Include Sample Code to reproduce behavior

// default code with default configurations as per documentation

Include output with Debug turned on

  kafka-node:KafkaClient clearing 10.10.2.124:9093 callback queue without error +37s
  kafka-node:KafkaClient clearing 10.10.2.124:9093 callback queue without error +1ms
  kafka-node:KafkaClient found 1 connected broker(s) +21ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +134ms
  kafka-node:KafkaClient found 1 connected broker(s) +41ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +11ms
  kafka-node:KafkaClient found 1 connected broker(s) +88ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +5ms
  kafka-node:KafkaClient found 1 connected broker(s) +96ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +6ms
  kafka-node:KafkaClient found 1 connected broker(s) +95ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +4ms
  kafka-node:KafkaClient found 1 connected broker(s) +98ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +5ms
  kafka-node:KafkaClient found 1 connected broker(s) +98ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +14ms
  kafka-node:KafkaClient found 1 connected broker(s) +88ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +8ms
  kafka-node:KafkaClient found 1 connected broker(s) +95ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +36ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +47ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +12ms
  kafka-node:KafkaClient found 1 connected broker(s) +9ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +13ms
  kafka-node:KafkaClient found 1 connected broker(s) +91ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +14ms
  kafka-node:KafkaClient found 1 connected broker(s) +90ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +31ms
  kafka-node:KafkaClient found 1 connected broker(s) +70ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +6ms
  kafka-node:KafkaClient found 1 connected broker(s) +97ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +12ms
  kafka-node:KafkaClient found 1 connected broker(s) +90ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +9ms
  kafka-node:KafkaClient found 1 connected broker(s) +92ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +61ms
  kafka-node:KafkaClient found 1 connected broker(s) +43ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +25ms
  kafka-node:KafkaClient found 1 connected broker(s) +75ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +4ms
  kafka-node:KafkaClient found 1 connected broker(s) +110ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +15ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +37ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +9ms
  kafka-node:KafkaClient found 1 connected broker(s) +42ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +27ms
  kafka-node:KafkaClient found 1 connected broker(s) +76ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +15ms
  kafka-node:KafkaClient found 1 connected broker(s) +89ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +13ms
  kafka-node:KafkaClient found 1 connected broker(s) +96ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +68ms
  kafka-node:KafkaClient found 1 connected broker(s) +35ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +10ms
  kafka-node:KafkaClient found 1 connected broker(s) +91ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +7ms
  kafka-node:KafkaClient found 1 connected broker(s) +95ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +8ms
  kafka-node:KafkaClient found 1 connected broker(s) +93ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +9ms
  kafka-node:KafkaClient found 1 connected broker(s) +93ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +6ms
  kafka-node:KafkaClient found 1 connected broker(s) +101ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +4ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +13ms
  kafka-node:KafkaClient kafka-node-client reconnecting to 10.10.2.124:9093 +28ms
  kafka-node:KafkaClient found 1 connected broker(s) +94ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +54ms
  kafka-node:KafkaClient found 1 connected broker(s) +49ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +8ms
  kafka-node:KafkaClient found 1 connected broker(s) +94ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +6ms
  kafka-node:KafkaClient found 1 connected broker(s) +96ms
KAFKA: error => BrokerNotAvailableError: Could not find the leader
  kafka-node:KafkaClient updating metadatas +6ms
  kafka-node:ConsumerGroup brokersChanged refreshing metadata +203ms
  kafka-node:Client refresh metadata currentAttempt 1 +1ms
  kafka-node:Client refresh metadata currentAttempt 2 +203ms
  kafka-node:Client refresh metadata currentAttempt 3 +402ms
  kafka-node:Client refresh metadata currentAttempt 4 +804ms
  kafka-node:Client refresh metadata currentAttempt 5 +1s
  kafka-node:Client refresh metadata currentAttempt 6 +1s
  kafka-node:Client refresh metadata currentAttempt 7 +1s
  kafka-node:Client refresh metadata currentAttempt 8 +1s
  kafka-node:Client refresh metadata currentAttempt 9 +1s
  kafka-node:Client refresh metadata currentAttempt 10 +1s
  kafka-node:Client refresh metadata currentAttempt 11 +1s
  kafka-node:Client refresh metadata error undefined +3ms
KAFKA: error => TimeoutError: Request timed out after 30000ms

Most helpful comment

Thanks @muradm , this bit us too.

I did a slightly different workaround:

      consumerGroup.on('error', (err) => {
        log.error(`error on consumerGroup ${groupName}`);
        log.error(err);
        // handle a broker not available error
        if (err && err.name === "BrokerNotAvailableError") {
          log.error("attempting reconnect");
          consumerGroup.client.refreshMetadata(consumerGroup.topics, (err) => {
            // handle errors here
          });
        } else {
          log.error(err.stack);
        }
      });

All 5 comments

Workaround:

setInterval(() => {
  try {
    kafkaConsumerGroup.client.refreshMetadata(topics, (error) => {
      if (error != null) {
        console.log(`KAFKA-CLIENT-REFRESH-METADATA-ERROR: ${error}`);
      }
    });
  } catch (err) {
    console.log(`KAFKA-CLIENT-REFRESH-METADATA-ERROR: unexpected: ${err}`);
  }
}, 30000);

Thanks @muradm , this bit us too.

I did a slightly different workaround:

      consumerGroup.on('error', (err) => {
        log.error(`error on consumerGroup ${groupName}`);
        log.error(err);
        // handle a broker not available error
        if (err && err.name === "BrokerNotAvailableError") {
          log.error("attempting reconnect");
          consumerGroup.client.refreshMetadata(consumerGroup.topics, (err) => {
            // handle errors here
          });
        } else {
          log.error(err.stack);
        }
      });

Your solution is definitely cleaner, than mine amature :)

Your solution is definitely cleaner

But I wouldn't have thought of how to do it without your first suggestion. We helped each other. Thank you.

Helped me out too thanks!!

Was this page helpful?
0 / 5 - 0 ratings