Kafka-node: Consumer group still receives timeout error even after it is closed

Created on 23 Nov 2017  路  7Comments  路  Source: SOHU-Co/kafka-node

Bug Report

It seems that the event emitter is not properly removed even the close() is called. What happened to me was that I had only one Kafka broker running, then I stopped the broker when consumer group was connected to it and before the close() function was called. Then I got some error messages saying that "refreshBrokerMetadata failed" and "Unable to find available brokers to try" (which is how it is supposed to be and reasonable), however after 30000ms I got another error message saying that "Request timed out after 30000ms".

Environment

  • Node version: v7.10.1
  • Kafka-node version: 2.3.0
  • Kafka version: 0.11.0.1
  • Number of Brokers: 1
  • Number partitions for topic: 8

Include Sample Code to reproduce behavior

var Kafka = require('kafka-node')
var ConsumerGroup = Kafka.ConsumerGroup
var consumerOptions = {
  kafkaHost: '127.0.0.1:9092',
  groupId: 'ExampleTestGroup',
  sessionTimeout: 15000,
  autoCommit: true,
  protocol: ['roundrobin'],
  fromOffset: 'earliest',
  outOfRangeOffset: 'earliest'
}
var topics = ['test']
var consumerGroup = new ConsumerGroup(consumerOptions, topics)
consumerGroup.on('error', (error) => console.log('error: ', error))
consumerGroup.on('connect', () => console.log('connected'))
consumerGroup.on('rebalancing', () => console.log('rebalancing: '))
consumerGroup.on('rebalanced', () => {
    console.log('rebalanced')
    // Before this is called, I manually shutdown the Kafka service
    setTimeout(() => {
        consumerGroup.close(() => console.log('consumer group is closed'))
    }, 10000)
})
consumerGroup.on('message', msg => console.log('got message: ', msg))

Output

error:  { NestedError: refreshBrokerMetadata failed
    at async.waterfall (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:301:35)
    at /home/Kafka-node/node_modules/async/dist/async.js:473:16
    at next (/home/Kafka-node/node_modules/async/dist/async.js:5315:29)
    at /home/Kafka-node/node_modules/async/dist/async.js:958:16
    at KafkaClient.getAvailableBroker (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:255:12)
    at async.waterfall.callback (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:296:23)
    at nextTask (/home/Kafka-node/node_modules/async/dist/async.js:5310:14)
    at Object.waterfall (/home/Kafka-node/node_modules/async/dist/async.js:5320:5)
    at KafkaClient.refreshBrokerMetadata (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:295:9)
    at KafkaClient.Client.sendGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:295:10)
Caused By: Error: Unable to find available brokers to try
    at KafkaClient.getAvailableBroker (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:255:21)
    at async.waterfall.callback (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:296:23)
    at nextTask (/home/Kafka-node/node_modules/async/dist/async.js:5310:14)
    at Object.waterfall (/home/Kafka-node/node_modules/async/dist/async.js:5320:5)
    at KafkaClient.refreshBrokerMetadata (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:295:9)
    at KafkaClient.Client.sendGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:295:10)
    at KafkaClient.Client.sendLeaveGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:320:8)
    at ConsumerGroup.leaveGroup (/home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:557:17)
    at /home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:619:14
    at /home/Kafka-node/node_modules/async/dist/async.js:3866:24
  nested:
   Error: Unable to find available brokers to try
       at KafkaClient.getAvailableBroker (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:255:21)
       at async.waterfall.callback (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:296:23)
       at nextTask (/home/Kafka-node/node_modules/async/dist/async.js:5310:14)
       at Object.waterfall (/home/Kafka-node/node_modules/async/dist/async.js:5320:5)
       at KafkaClient.refreshBrokerMetadata (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:295:9)
       at KafkaClient.Client.sendGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:295:10)
       at KafkaClient.Client.sendLeaveGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:320:8)
       at ConsumerGroup.leaveGroup (/home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:557:17)
       at /home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:619:14
       at /home/Kafka-node/node_modules/async/dist/async.js:3866:24 }
  kafka-node:ConsumerGroup Leave group failed with { BrokerNotAvailableError
    at new BrokerNotAvailableError (/home/Kafka-node/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
    at KafkaClient.Client.sendGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:296:15)
    at KafkaClient.Client.sendLeaveGroupRequest (/home/Kafka-node/node_modules/kafka-node/lib/client.js:320:8)
    at ConsumerGroup.leaveGroup (/home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:557:17)
    at /home/Kafka-node/node_modules/kafka-node/lib/consumerGroup.js:619:14
    at /home/Kafka-node/node_modules/async/dist/async.js:3866:24
    at replenish (/home/Kafka-node/node_modules/async/dist/async.js:998:17)
    at iterateeCallback (/home/Kafka-node/node_modules/async/dist/async.js:983:17)
    at /home/Kafka-node/node_modules/async/dist/async.js:958:16
    at /home/Kafka-node/node_modules/async/dist/async.js:3871:13 message: 'Broker not available' } +1ms
  kafka-node:KafkaClient close client +0ms
consumer group is closeed
  kafka-node:ConsumerGroup Connect ignored. Consumer closed. +925ms
error:  { TimeoutError
    at new TimeoutError (/home/Kafka-node/node_modules/kafka-node/lib/errors/TimeoutError.js:6:9)
    at Timeout.setTimeout [as _onTimeout] (/home/Kafka-node/node_modules/kafka-node/lib/kafkaClient.js:384:14)
    at ontimeout (timers.js:386:14)
    at tryOnTimeout (timers.js:250:5)
    at Timer.listOnTimeout (timers.js:214:5) message: 'Request timed out after 30000ms' }

Most helpful comment

Any update on this, I am facing the same issue.

All 7 comments

I hope this is fixed with the latest version are you still seeing this issue?

I'm having this issue as well. The connection is being closed when I call close but sometimes errors are being emitted.

I don't think anything would of changed to fix this lately.

Would just need to avoid emitting errors once the connection is closed.

Any update on this, I am facing the same issue.

Idem.

Am facing this issue in "kafka-node": "^4.1.3"

facing same issue with below configuration
kafka node- "4.1.1"
node version - v12.16.1

Was this page helpful?
0 / 5 - 0 ratings