Kafka-node: Broker not available (sendRequest) after producer ready. Only sends messages within the ready event

Created on 21 Sep 2018  Â·  9Comments  Â·  Source: SOHU-Co/kafka-node

Questions?

My producer only sends messages on ready event. When trying to send a new message, I get a BrokerNotAvailableError { message: 'Broker not available (sendRequest)'

Bug Report

Environment

  • Node version: 8.12.0
  • Kafka-node version: 3.0.1
  • Kafka version: 2.0.0

For specific cases also provide

  • Number of Brokers: 1
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

class MyKafkaClient {
    private readonly kafkaNodeClient: Kafka.KafkaClient
    private producer: Kafka.Producer
    private producerInitializing = false
    private producerReady = false
    private sendQueue = []
  public enqueue(topic: string, data: any, cb: EnqueueCallback): void {

    if (this.producerReady) {
      const payload = [{
        topic,
        messages: JSON.stringify(data)
      }]
      return this.producer.send(payload, (err, data) => {
        if (err) {
          console.log('err', err)
          this.kafkaNodeClient.refreshMetadata([topic], (err) => {
            console.log('refreshMetadata err', err)

            return this.enqueue(topic, data, options, cb)
          })
        }
      })
    }

    this.sendQueue.push({ topic, data, options, cb })

    if (this.producerInitializing) {
      return
    }

    this.initProducer()
  }

  private initProducer = () => {
    this.producerInitializing = true
    const producerOptions: Kafka.ProducerOptions = {
      requireAcks: 1,
      ackTimeoutMs: 300,
      partitionerType: 2,
    }
    this.producer = new Kafka.Producer(this.kafkaNodeClient, producerOptions)
    this.producer.on('ready', this.onProducerReady)
    this.producer.on('error', (err) => this.onProducerError(err))
  }

  private onProducerReady = () => {
    console.log('ready')

    this.producerInitializing = false
    this.producerReady = true
    for (const queueItem of this.sendQueue) {
      const payload = {
        topic: queueItem.topic,
        messages: JSON.stringify(queueItem.data)
      }
      console.log('payload', payload)
      this.producer.send([payload], queueItem.cb)
    }
    this.sendQueue = []
  }

  private onProducerError = (err: Error) => {
    console.error(err)
  }
    const kafka = new MyKafkaClient() //using default conf
    const topic = 'test-only'
    kafka.enqueue(topic, { 'data': 'not ready yet' }, () => { })
    setTimeout(() => {
      const data = [1, 2, 3, 4, 5]
      for (let item of data) {
        kafka.enqueue(topic, { 'data': item }, () => { })
      }
      kafka.close(done)
    }, 300)

Include output with Debug turned on

  kafka-node:KafkaClient Connect attempt 1 +0ms
  kafka-node:KafkaClient Trying to connect to host: localhost port: 9092 +4ms
  kafka-node:KafkaClient Sending versions request to localhost:9092 +16ms
  kafka-node:KafkaClient broker socket connected {"host":"localhost","port":"9092"} +2ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +2ms
  kafka-node:KafkaClient Received versions response from localhost:9092 +11ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +1ms
  kafka-node:KafkaClient updating metadatas +56ms
  kafka-node:KafkaClient compressing messages if needed +13ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +6ms
  kafka-node:KafkaClient Sending versions request to 10.200.193.213:9092 +1ms
  kafka-node:KafkaClient Received versions response from 10.200.193.213:9092 +4ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
  kafka-node:KafkaClient broker is now ready +20ms
  console.log src/models/OOKafkaClient.ts:31
    kafka client connect

  console.log src/models/OOKafkaClient.ts:115
    ready

  console.log src/models/OOKafkaClient.ts:125
    payload { topic: 'test-only', messages: '{"data":"not ready yet"}' }

  console.log src/models/OOKafkaClient.ts:31
    kafka client connect

  kafka-node:KafkaClient compressing messages if needed +182ms
  kafka-node:KafkaClient compressing messages if needed +2ms
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient compressing messages if needed +1ms
  kafka-node:KafkaClient compressing messages if needed +0ms
  kafka-node:KafkaClient close client +1ms
  kafka-node:KafkaClient clearing 10.200.193.213:9092 callback queue without error +36ms
  kafka-node:KafkaClient clearing localhost:9092 callback queue without error +1ms
  kafka-node:Client refresh metadata currentAttempt 1 +3ms
  kafka-node:Client refresh metadata currentAttempt 1 +4ms
  kafka-node:Client refresh metadata currentAttempt 1 +3ms
  kafka-node:Client refresh metadata currentAttempt 1 +3ms
  kafka-node:Client refresh metadata currentAttempt 1 +2ms

Most helpful comment

It's happening for me too. I'm adding retry to my overly optimistic code.

All 9 comments

Hi Alex,

Did you find the cause of this? I am facing the same issue.

Thanks,
Shashank

Hi,
I am facing the same issue.. Once I start the producer, it pushes messages to kafka for some time. After a certain time, it throws the above error.
The listener is working fine.
kafka-node : version : 4.1.1
node : version : 10.15.3 (lts)

TIA

Hi,
I am facing the same issue. just sent the message normally for a while. after a while, there will be an error like "Broker not available (sendRequest)".
kafka-node version: 4.1.3
node version: 8.10.0
kafka version: 2.0.0
zookeeper version: 3.4.13

@rohit-rapido and @jimluo168 turn on debugging may give a clue as to why this happens.

I've got the same exact problem. Nothing in the debug logs suggests that there's a problem.

EDIT: figured it out. I was closing the Consumer which was inadvertently closing the same kafkaClient that the Producer was using.

I am also facing the same issue.
Node: 6.12.0
Kafka-node: 4.1.3
Error: "Broker not available (sendRequest)"

I am attaching the sample code which manages the connection with Kafka.
(I am using sendMessage() function to send event in Kafka topic)
https://gist.github.com/amolbarewar/06de1f5695bd8c4bde56cfa3c656f366

+1
Node: 8.16.1
kafka-node: 4.1.3
Error: Broker not available (sendRequest)

For me, it seems like the error coincides with a rebalance, but it occurs on producing which doesn't make any sense to me.

@hyperlink I am facing the same issue, it happens randomly on my product environment and my node verison: v8.9.4ï¼› kafka version :v2.0.0 ; kafka-node version : v4.1.3;
Now, my solution is retry send the message. But i still wanna now is this a bug?

It's happening for me too. I'm adding retry to my overly optimistic code.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

chetandev picture chetandev  Â·  5Comments

AnnisaNurika picture AnnisaNurika  Â·  5Comments

twawszczak picture twawszczak  Â·  6Comments

cesaraugustogarcia picture cesaraugustogarcia  Â·  3Comments

ghinks picture ghinks  Â·  6Comments