My producer only sends messages on ready event. When trying to send a new message, I get a BrokerNotAvailableError { message: 'Broker not available (sendRequest)'
class MyKafkaClient {
private readonly kafkaNodeClient: Kafka.KafkaClient
private producer: Kafka.Producer
private producerInitializing = false
private producerReady = false
private sendQueue = []
public enqueue(topic: string, data: any, cb: EnqueueCallback): void {
if (this.producerReady) {
const payload = [{
topic,
messages: JSON.stringify(data)
}]
return this.producer.send(payload, (err, data) => {
if (err) {
console.log('err', err)
this.kafkaNodeClient.refreshMetadata([topic], (err) => {
console.log('refreshMetadata err', err)
return this.enqueue(topic, data, options, cb)
})
}
})
}
this.sendQueue.push({ topic, data, options, cb })
if (this.producerInitializing) {
return
}
this.initProducer()
}
private initProducer = () => {
this.producerInitializing = true
const producerOptions: Kafka.ProducerOptions = {
requireAcks: 1,
ackTimeoutMs: 300,
partitionerType: 2,
}
this.producer = new Kafka.Producer(this.kafkaNodeClient, producerOptions)
this.producer.on('ready', this.onProducerReady)
this.producer.on('error', (err) => this.onProducerError(err))
}
private onProducerReady = () => {
console.log('ready')
this.producerInitializing = false
this.producerReady = true
for (const queueItem of this.sendQueue) {
const payload = {
topic: queueItem.topic,
messages: JSON.stringify(queueItem.data)
}
console.log('payload', payload)
this.producer.send([payload], queueItem.cb)
}
this.sendQueue = []
}
private onProducerError = (err: Error) => {
console.error(err)
}
const kafka = new MyKafkaClient() //using default conf
const topic = 'test-only'
kafka.enqueue(topic, { 'data': 'not ready yet' }, () => { })
setTimeout(() => {
const data = [1, 2, 3, 4, 5]
for (let item of data) {
kafka.enqueue(topic, { 'data': item }, () => { })
}
kafka.close(done)
}, 300)
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: localhost port: 9092 +4ms
kafka-node:KafkaClient Sending versions request to localhost:9092 +16ms
kafka-node:KafkaClient broker socket connected {"host":"localhost","port":"9092"} +2ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +2ms
kafka-node:KafkaClient Received versions response from localhost:9092 +11ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
kafka-node:KafkaClient broker is now ready +1ms
kafka-node:KafkaClient updating metadatas +56ms
kafka-node:KafkaClient compressing messages if needed +13ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +6ms
kafka-node:KafkaClient Sending versions request to 10.200.193.213:9092 +1ms
kafka-node:KafkaClient Received versions response from 10.200.193.213:9092 +4ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
kafka-node:KafkaClient broker is now ready +20ms
console.log src/models/OOKafkaClient.ts:31
kafka client connect
console.log src/models/OOKafkaClient.ts:115
ready
console.log src/models/OOKafkaClient.ts:125
payload { topic: 'test-only', messages: '{"data":"not ready yet"}' }
console.log src/models/OOKafkaClient.ts:31
kafka client connect
kafka-node:KafkaClient compressing messages if needed +182ms
kafka-node:KafkaClient compressing messages if needed +2ms
kafka-node:KafkaClient compressing messages if needed +1ms
kafka-node:KafkaClient compressing messages if needed +1ms
kafka-node:KafkaClient compressing messages if needed +0ms
kafka-node:KafkaClient close client +1ms
kafka-node:KafkaClient clearing 10.200.193.213:9092 callback queue without error +36ms
kafka-node:KafkaClient clearing localhost:9092 callback queue without error +1ms
kafka-node:Client refresh metadata currentAttempt 1 +3ms
kafka-node:Client refresh metadata currentAttempt 1 +4ms
kafka-node:Client refresh metadata currentAttempt 1 +3ms
kafka-node:Client refresh metadata currentAttempt 1 +3ms
kafka-node:Client refresh metadata currentAttempt 1 +2ms
Hi Alex,
Did you find the cause of this? I am facing the same issue.
Thanks,
Shashank
Hi,
I am facing the same issue.. Once I start the producer, it pushes messages to kafka for some time. After a certain time, it throws the above error.
The listener is working fine.
kafka-node : version : 4.1.1
node : version : 10.15.3 (lts)
TIA
Hi,
I am facing the same issue. just sent the message normally for a while. after a while, there will be an error like "Broker not available (sendRequest)".
kafka-node version: 4.1.3
node version: 8.10.0
kafka version: 2.0.0
zookeeper version: 3.4.13
@rohit-rapido and @jimluo168 turn on debugging may give a clue as to why this happens.
I've got the same exact problem. Nothing in the debug logs suggests that there's a problem.
EDIT: figured it out. I was closing the Consumer which was inadvertently closing the same kafkaClient that the Producer was using.
I am also facing the same issue.
Node: 6.12.0
Kafka-node: 4.1.3
Error: "Broker not available (sendRequest)"
I am attaching the sample code which manages the connection with Kafka.
(I am using sendMessage() function to send event in Kafka topic)
https://gist.github.com/amolbarewar/06de1f5695bd8c4bde56cfa3c656f366
+1
Node: 8.16.1
kafka-node: 4.1.3
Error: Broker not available (sendRequest)
For me, it seems like the error coincides with a rebalance, but it occurs on producing which doesn't make any sense to me.
@hyperlink I am facing the same issue, it happens randomly on my product environment and my node verison: v8.9.4ï¼› kafka version :v2.0.0 ; kafka-node version : v4.1.3;
Now, my solution is retry send the message. But i still wanna now is this a bug?
It's happening for me too. I'm adding retry to my overly optimistic code.
Most helpful comment
It's happening for me too. I'm adding retry to my overly optimistic code.