Kafka-node: Create Topics (Array of JSON Topics) - Broker not available

Created on 28 Sep 2018  路  11Comments  路  Source: SOHU-Co/kafka-node

Basically the array of JSON objects fails to create the topics, but a simple array of strings to create topics works. See below.

Sample Code (Array of JSON objects)

const topicsToCreate = [{
topic: 'topic1',
partitions: 1,
replicationFactor: 2
},
{
topic: 'topic2',
partitions: 5,
replicationFactor: 3
}];

client.createTopics(topicsToCreate,(error, result) =>{
console.log(error);
console.log(result);

});

Error:
{ BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/code/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket. (/code/node_modules/kafka-node/lib/kafkaClient.js:769:19)
at emitOne (events.js:116:13)
at Socket.emit (events.js:211:7)
at TCP._handle.close [as _onclose] (net.js:554:12) message: 'Broker not available' }
undefined
sending { topic6: { '0': 3 } }

It works when I just send in a array of string like this:

client.createTopics(['j1','j2'],(error, result) =>{
console.log(error);
console.log(result);

});

Output:
topic "j1" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0

topic "topic6" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
topic "j2" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0

But want I really want is to setup the partitions.

  • Node 8
  • Kafka-node version: "kafka-node": "~3.0.1",
  • Kafka version: kafka_2.11-0.10.1.0 (Spotify docker)

Most helpful comment

Finally got the solution. I was doing on mistake. By default Apache Kafka create the 3 replication, this mean its by default create 3 broker as well. However aboe YAML create only one broker, and looknig for other 2 which is not created. hence we are getting the same error.

Fix

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      ES_JAVA_OPTS: "-Xms512m -Xmx3000m"

then KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 tell the Apache kafka that we Wanted to create only one replication and its work

Happy coding :)

Thanks & Regards
Jaiswar Vipin Kumar R.

All 11 comments

No answers here?
I have a similar issue on 3.0.1
Also, the docs are more confusing than helpful.
Just check out the "example" about createTopics and you'll never know what's the actual right way to do it.
It's under Producer headline, but then in the code it says client.createTopics (?!) and if that's not enough, they don't show you what's the value inside topics.
There is topicsToCreate but no topics.
No matter what I tried, it was either 'broker not available' or some other error complaining about the type of the value of topic being an object instead of a string.
Please, this is a very basic usage of this lib, we just want to know how to set the number of partitions on the creation of a topic. shouldn't be so hard..............

I was just able to verify that using the kafkaClient, you can create a topic and pass in the amount of partitions you want so I'm not seeing where the issue is. Maybe the kafka version you are using?

Using Kafka 2.0 and kafka-node 3.0.1 (I've also tested with kafka_2.11-0.10.2.1):

this.client = new kafka.KafkaClient(this.options); // you can also use new kafka.Admin()
this.client.createTopics( [
    {
      topic: 'my-2nd-topic',
      partitions: 5,
      replicationFactor: 1
    }], (error, result) => {
       console.log(error); / /according to the protocol, the result is empty unless if there are errors
    });

And running the command line tools to inspect the partitions:

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-2nd-topic
Topic:my-2nd-topic  PartitionCount:5    ReplicationFactor:1 Configs:
    Topic: my-2nd-topic Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: my-2nd-topic Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: my-2nd-topic Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: my-2nd-topic Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: my-2nd-topic Partition: 4    Leader: 1001    Replicas: 1001  Isr: 1001

Alternatively to the command line, you can call client.loadMetadataForTopics([topicName]) which will also return the number of partitions for that topic.

Can you try using a different version of kafka?

+1

this.client.createTopics( [
{
topic: 'my-2nd-topic',
partitions: 5,
replicationFactor: 1
}], (error, result) => {
console.log(error); / /according to the protocol, the result is empty unless if there are errors
});

This works as documented without typescript, according to typescript the input for createTopics is string[], which works but does not let me specify the partitions or replication factor.

Another thing is the fact that if the call was made correctly, but the kafka response was still not satisfactory like 'topic already exists' the error is in the result and if satisfactory the result is empty.

We had similar issues and the reason was our replicationFactor value. For some reason our create topics request didn't give any errors and still no topics were created. We had only one kafka broker and replicationFactor was set to 2 and that's not allowed. You need to have same amount of brokers running as what you have defined to replicationFactor.

Hope this helps others struggling with this.

+1

+1

kafka-node 4.0.2

I can produce and cosume using kafka shell scripts, but not with kafka-node

Try to use 9092 port for connection

For me it is a issue with memory, I run kafka in kubernetes, and limited its memory to 1GB. That was the reason I was facing this issue.

When I increase it to 2GB, I don't see this issue anymore.

My docker-compose file

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: localhost:32181
      KAFKA_LISTENERS: INTERNAL://localhost:9092,OUTSIDE://localhost:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://localhost:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      ES_JAVA_OPTS: "-Xms512m -Xmx3000m"

and Producer code is

var kafka = require('kafka-node'),
    Producer = kafka.Producer,
    KeyedMessage = kafka.KeyedMessage,
    client = new kafka.KafkaClient({kafkaHost:"localhost:9094"}),

    producer = new Producer(client),
    km = new KeyedMessage('key', 'message'),
    payloads = [
        { topic: 'topic1', messages: 'hi', partition: 0 },
        { topic: 'topic1', messages: ['hello', 'world', km] }
    ];

client.createTopics(topicsToCreate, (error, result) => {

                echo ("------------------------KAFAK--------------------")
                console.log(error);
                console.log(result);            
            });

Getting below error, while creating the topic before sending pay-load to topics

{ BrokerNotAvailableError: Broker not available (loadMetadataForTopics)
    at new BrokerNotAvailableError (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9)
    at KafkaClient.loadMetadataForTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:891:21)
    at KafkaClient.loadMetadata (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:876:8)
    at KafkaClient.getController (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:267:8)
    at KafkaClient.sendControllerRequest (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:1219:8)
    at KafkaClient.createTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:935:8)
    at C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\index_behind_kong.js:60:11
    at C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\baseClient.js:370:18
    at KafkaClient.loadMetadataForTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:891:12)
    at RetryOperation._fn (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\baseClient.js:360:12) message: 'Broker not available (loadMetadataForTopics)' }
undefined

I have gave the delay of 5 seconds before calling to createtopic method, however no luck found.

Kindly assist.

Finally got the solution. I was doing on mistake. By default Apache Kafka create the 3 replication, this mean its by default create 3 broker as well. However aboe YAML create only one broker, and looknig for other 2 which is not created. hence we are getting the same error.

Fix

version: '3'
services:
  zookeeper:
    container_name: zookeeper
    image: confluentinc/cp-zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2

  kafka:
    container_name: kafka
    image: confluentinc/cp-kafka
    ports:
      - "9094:9094"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      ES_JAVA_OPTS: "-Xms512m -Xmx3000m"

then KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 tell the Apache kafka that we Wanted to create only one replication and its work

Happy coding :)

Thanks & Regards
Jaiswar Vipin Kumar R.

Was this page helpful?
0 / 5 - 0 ratings