Kafka-node: Error Handling Kafka Server Failure

Created on 8 Feb 2017  路  12Comments  路  Source: SOHU-Co/kafka-node

kafka-node: v1.3.4

So I am trying to handle the case when the connection to Kafka becomes interrupted and a connection is no longer possible. Whether it be an internet failure, power outage, or whatever, I would like to handle this case gracefully.

So far I have been unsuccessful in catching this error. It throws a critical error that kills the consumer script that I cannot seem to catch. To make it more complicated it only kills the script about 80-90% of the time when the Kafka server dies.

Here is the error I get when I manually terminate the kafka server:

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: connect ECONNREFUSED 127.0.1.1:9092
    at Object.exports._errnoException (util.js:1022:11)
    at exports._exceptionWithHostPort (util.js:1045:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1087:14)

Here is the exact test script I'm using to test:

try {
  let kafka = require('kafka-node');
  let fs = require('fs');
  let client = new kafka.Client('localhost:2181');
  let Consumer = kafka.Consumer;

  client.on('error', function(error)
  {
    console.log('client error', error);
  });

 let consumer = new Consumer(
      client,
      [
        {topic: 'test', partition: 0}
      ],
      {
        groupId: 'test-group',
        autoCommit: true
      }
    );


  consumer.on('message', function(message)
  {
    console.log('message', message);
  });

  consumer.on('error', function(error)
  {
    console.log('error', error);
  });
} catch (error)
{
  console.log('critical fail', error);
}

Messages come through cleanly, but I cannot seem to catch this error.

Additionally if I restart the consumer script while the kafka server is down it will handle the failed connection gracefully 100% of the time saying:

error { BrokerNotAvailableError: Broker not available
    at new BrokerNotAvailableError (/PathToProject/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at Client.loadMetadataForTopics (/PathToProject/node_modules/kafka-node/lib/client.js:348:15)
    at RetryOperation._fn (/PathToProject/node_modules/kafka-node/lib/client.js:488:12)
    at Timeout._onTimeout (/PathToProject/node_modules/retry/lib/retry_operation.js:68:10)
    at ontimeout (timers.js:365:14)
    at tryOnTimeout (timers.js:237:5)
    at Timer.listOnTimeout (timers.js:207:5) message: 'Broker not available' }

This is good, handled as expected; however if you then restart the kafka server after the consumer has started about 90% of the time the consumer will run through every single offset again.

I have also tried to manually set the offset using the offset using this code:

try {
  let kafka = require('kafka-node');
  let fs = require('fs');

  let client = new kafka.Client('localhost:2181');
  let Consumer = kafka.Consumer;

  let  offset = new kafka.Offset(client),
    consumer = new Consumer(
      client,
      [
        {topic: 'test', partition: 0}
      ],
      {
        groupId: 'test-group',
        autoCommit: false
      }
    );


  consumer.on('message', function(message)
  {
    console.log('message', message);

    offset.commit('test-group', [{
      topic: 'test',
      partition: 0,
      offset: message.offset
    }]);
  });

  consumer.on('error', function(error)
  {
    console.log('error', error);
  });
} catch (error)
{
  console.log('critical fail', error);
}

Most helpful comment

Well there it is, adding an offset.on('error', cb) catches the previously uncaught error. So for every object that you use in the kafka-node library (or any code that uses error events) you should also include an on error listener to catch any errors it might throw.

So I guess really there is no bug, I just did not fully understand how to catch the errors in this instance. I will go ahead and close this, but perhaps it will help someone else down the line who may be confused.

Thanks for your help.

All 12 comments

I would recommend using either ConsumerGroup (0.9+) or HighLevelConsumer. Unless you need to consume from a specific partition both of these will automatically save consumed offsets for you (provided they are in the same group) and they have been tested to recover from network issues fairly well. If there is such a catastrophic failure like power loss the worse that could happen is you double consume the few offsets that weren't committed in time. Good luck.

Ok, I tried to convert to consumerGroup and got messages to flow as expected using this code:

try {
  let kafka = require('kafka-node');

  let options = {
    host: "localhost:2181",
    groupId: 'test-group',
    protocol: ['roundrobin'],
    fromOffset: 'latest',
    migrateHLC: false
  };

  let consumerGroup = new kafka.ConsumerGroup(options, ['test']);


  consumerGroup.on('message', function(message)
  {
    console.log('message', message);

  });

  consumerGroup.on('error', function(error)
  {
    console.log('error', error);
  });
} catch (error)
{
  console.log('critical fail', error);
}

However when the kafka server dies most often I get this error and the consumerGroup critically fails in a seemingly uncatchable error.

events.js:160
      throw er; // Unhandled 'error' event
      ^

Error: connect ECONNREFUSED 127.0.1.1:9092
    at Object.exports._errnoException (util.js:1022:11)
    at exports._exceptionWithHostPort (util.js:1045:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1087:14)

Also if you restart the consumerGroup while kafka is still down then it will run through every message again starting from the beginning. This happens about 90% of the time.

There's no need to kill your consumer since there's logic in the client to retry connecting the socket on an interval when it disconnects. So for network connection errors it's safe to ignore it. If the broker changes or goes away forever it will attempt to refresh the broker list and pick up where it left off from the other broker.

In your test try leaving the consumer alone and then restarting the broker and see if it recovers.

So I am not explicitly stopping the consumer, the uncaught error stops it. If the error could be caught I could go ahead and ignore it.

Kafka version: 0.10.0.1

To expand on how I am testing:

Start kafka:

bin/kafka-server-start.sh config/server.properties

Start consumer or consumerGroup

node /pathToConsumers/consumer.js
or
node /pathToConsumers/consumerGroup.js

Send a message:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
test

Consumer then dumps the message as expected.

Return to server terminal and hit ctrl-c to terminate the server, simulating a kafka server failure.

Then most of the time, but not always which makes this difficult, the consumer, with the code I've pasted above, will throw an uncaught exception that terminates the consumer.

Hmm it shouldn't be throwing uncaught errors. Your code that listens to the error event on the consumer should handle it. If that's not the case you can add a listener on process to prevent it from crashing.

https://nodejs.org/api/process.html#process_event_uncaughtexception

Thanks for your help so far btw.

Adding:

process.on('uncaughtException', (err) => {

    console.log('uncaught', err);
  });

works in catching errors in the consumer script.

For some reason I can no longer get the ConsumerGroup to fail completely on kafka server termination. Sometimes in an instance of restart then terminate kafka server it will dump the error using consumerGroup.on('error, cb), then sometimes it will do nothing in response to kafka failure, and then sometimes it will dump multiple errors at once (max of 4 so far).

An hour after not running more debugging it will display at least 2 error messages on kafka failure always, very confusing. This is the error that if it does show it shows somewhere between 1-4 times on kafka failure:

EDIT: this is a catchable error from on error.

error { Error: connect ECONNREFUSED 127.0.1.1:9092
    at Object.exports._errnoException (util.js:1022:11)
    at exports._exceptionWithHostPort (util.js:1045:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1087:14)
  code: 'ECONNREFUSED',
  errno: 'ECONNREFUSED',
  syscall: 'connect',
  address: '127.0.1.1',
  port: 9092 }

Perhaps earlier I mistook the consumer.js for the consumerGroup.js scripts...I don't know, unfortunately a lot of things are inconsistent as even the consumer does not fail everytime.

So the consumerGroup seems catchable at least, just that the on error event seems to only work occasionally.

What you're seeing makes sense.

When Kafka initially goes offline it will cause the client to reconnect at an interval and it takes a moment for TCP to realize it can't make the connection and throws the ECONNREFUSED error. It will keep doing this until the ephemeral node in zookeeper that represents the kafka broker times out (around 30 seconds?). Once it is gone the client realizes this and will stop trying to connect to the broker and those errors stops.

When kafka comes back up again it will register with zookeeper and the client should pick it up the broker server again and reconnect and continue to consume.

Ok, then I can see that then. There is still the issue that the consumerGroup sometimes does not bubble up errors when kafka is terminated, is this expected?

Also perhaps the documentation for the base consumer (as in require('kafka-node').Consumer) should be changed to reflect that on kafka failure it frequently throws an uncaught exception that will need to be caught with process.on('uncaughtException, cb). This I rather consistently get to fail in this manner.

I haven't been able to reproduce the uncaught exception. Could you give more details about your environment?

Here's what I'm doing.

  1. ./start-docker
  2. node examples/consumerGroupMember.js
  3. wait till all consumers join the group
  4. docker-compose stop kafka
  5. wait a bit
  6. docker-compose start kafka

Ah let me clarify.

For the consumerGroup there seems to be no issue with uncaught exceptions, I must have accidentally ran my consumer script which does throw uncaught exceptions.

So to reproduce uncaught exceptions:

Use code:

try {
  let kafka = require('kafka-node');

  let client = new kafka.Client('localhost:2181');
  let Consumer = kafka.Consumer;

  let  offset = new kafka.Offset(client),
    consumer = new Consumer(
      client,
      [
        {topic: 'test', partition: 0}
      ],
      {
        groupId: 'test-group',
        autoCommit: false
      }
    );


  consumer.on('message', function(message)
  {
    console.log('message', message);

    offset.commit('test-group', [{
      topic: 'test',
      partition: 0,
      offset: message.offset
    }]);
  });

  consumer.on('error', function(error)
  {
    console.log('error', error);
  });
} catch (error)
{
  console.log('critical fail', error);
}

Start Kafka.

bin/kafka-server-start.sh config/server.properties

Start consumer:

node consumer.js

Stop Kafka by hitting ctrl-c in Kafka server terminal.

Most of the time the consumer.js script will throw an uncaught exception, but not every time so you may need to run several times.

The uncaught exception is probably from the offset add a error handler to it and let me know.

So in node any EventEmitter that emits an error event with no listeners will be considered an uncaught exception.

Well there it is, adding an offset.on('error', cb) catches the previously uncaught error. So for every object that you use in the kafka-node library (or any code that uses error events) you should also include an on error listener to catch any errors it might throw.

So I guess really there is no bug, I just did not fully understand how to catch the errors in this instance. I will go ahead and close this, but perhaps it will help someone else down the line who may be confused.

Thanks for your help.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kameshwari-suresh picture kameshwari-suresh  路  3Comments

juhanishen picture juhanishen  路  7Comments

harshitgupta30 picture harshitgupta30  路  4Comments

quorak picture quorak  路  5Comments

ashishnetworks picture ashishnetworks  路  4Comments