Basically the array of JSON objects fails to create the topics, but a simple array of strings to create topics works. See below.
Sample Code (Array of JSON objects)
const topicsToCreate = [{
topic: 'topic1',
partitions: 1,
replicationFactor: 2
},
{
topic: 'topic2',
partitions: 5,
replicationFactor: 3
}];
client.createTopics(topicsToCreate,(error, result) =>{
console.log(error);
console.log(result);
});
Error:
{ BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/code/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.
at emitOne (events.js:116:13)
at Socket.emit (events.js:211:7)
at TCP._handle.close [as _onclose] (net.js:554:12) message: 'Broker not available' }
undefined
sending { topic6: { '0': 3 } }
It works when I just send in a array of string like this:
client.createTopics(['j1','j2'],(error, result) =>{
console.log(error);
console.log(result);
});
Output:
topic "j1" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
topic "topic6" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
topic "j2" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
But want I really want is to setup the partitions.
No answers here?
I have a similar issue on 3.0.1
Also, the docs are more confusing than helpful.
Just check out the "example" about createTopics and you'll never know what's the actual right way to do it.
It's under Producer headline, but then in the code it says client.createTopics (?!) and if that's not enough, they don't show you what's the value inside topics.
There is topicsToCreate but no topics.
No matter what I tried, it was either 'broker not available' or some other error complaining about the type of the value of topic being an object instead of a string.
Please, this is a very basic usage of this lib, we just want to know how to set the number of partitions on the creation of a topic. shouldn't be so hard..............
I was just able to verify that using the kafkaClient, you can create a topic and pass in the amount of partitions you want so I'm not seeing where the issue is. Maybe the kafka version you are using?
Using Kafka 2.0 and kafka-node 3.0.1 (I've also tested with kafka_2.11-0.10.2.1):
this.client = new kafka.KafkaClient(this.options); // you can also use new kafka.Admin()
this.client.createTopics( [
{
topic: 'my-2nd-topic',
partitions: 5,
replicationFactor: 1
}], (error, result) => {
console.log(error); / /according to the protocol, the result is empty unless if there are errors
});
And running the command line tools to inspect the partitions:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-2nd-topic
Topic:my-2nd-topic PartitionCount:5 ReplicationFactor:1 Configs:
Topic: my-2nd-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-2nd-topic Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-2nd-topic Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-2nd-topic Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: my-2nd-topic Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Alternatively to the command line, you can call client.loadMetadataForTopics([topicName]) which will also return the number of partitions for that topic.
Can you try using a different version of kafka?
+1
this.client.createTopics( [
{
topic: 'my-2nd-topic',
partitions: 5,
replicationFactor: 1
}], (error, result) => {
console.log(error); / /according to the protocol, the result is empty unless if there are errors
});
This works as documented without typescript, according to typescript the input for createTopics is string[], which works but does not let me specify the partitions or replication factor.
Another thing is the fact that if the call was made correctly, but the kafka response was still not satisfactory like 'topic already exists' the error is in the result and if satisfactory the result is empty.
We had similar issues and the reason was our replicationFactor value. For some reason our create topics request didn't give any errors and still no topics were created. We had only one kafka broker and replicationFactor was set to 2 and that's not allowed. You need to have same amount of brokers running as what you have defined to replicationFactor.
Hope this helps others struggling with this.
+1
+1
kafka-node 4.0.2
I can produce and cosume using kafka shell scripts, but not with kafka-node
Try to use 9092 port for connection
For me it is a issue with memory, I run kafka in kubernetes, and limited its memory to 1GB. That was the reason I was facing this issue.
When I increase it to 2GB, I don't see this issue anymore.
My docker-compose file
version: '3'
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
container_name: kafka
image: confluentinc/cp-kafka
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: localhost:32181
KAFKA_LISTENERS: INTERNAL://localhost:9092,OUTSIDE://localhost:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://localhost:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
ES_JAVA_OPTS: "-Xms512m -Xmx3000m"
and Producer code is
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({kafkaHost:"localhost:9094"}),
producer = new Producer(client),
km = new KeyedMessage('key', 'message'),
payloads = [
{ topic: 'topic1', messages: 'hi', partition: 0 },
{ topic: 'topic1', messages: ['hello', 'world', km] }
];
client.createTopics(topicsToCreate, (error, result) => {
echo ("------------------------KAFAK--------------------")
console.log(error);
console.log(result);
});
Getting below error, while creating the topic before sending pay-load to topics
{ BrokerNotAvailableError: Broker not available (loadMetadataForTopics)
at new BrokerNotAvailableError (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\errors\BrokerNotAvailableError.js:11:9)
at KafkaClient.loadMetadataForTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:891:21)
at KafkaClient.loadMetadata (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:876:8)
at KafkaClient.getController (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:267:8)
at KafkaClient.sendControllerRequest (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:1219:8)
at KafkaClient.createTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:935:8)
at C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\index_behind_kong.js:60:11
at C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\baseClient.js:370:18
at KafkaClient.loadMetadataForTopics (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\kafkaClient.js:891:12)
at RetryOperation._fn (C:\wamp64\www\ws-proxy\HOZ-KOG-WebSocket_NodeJS\node_modules\kafka-node\lib\baseClient.js:360:12) message: 'Broker not available (loadMetadataForTopics)' }
undefined
I have gave the delay of 5 seconds before calling to createtopic method, however no luck found.
Kindly assist.
Finally got the solution. I was doing on mistake. By default Apache Kafka create the 3 replication, this mean its by default create 3 broker as well. However aboe YAML create only one broker, and looknig for other 2 which is not created. hence we are getting the same error.
Fix
version: '3'
services:
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka:
container_name: kafka
image: confluentinc/cp-kafka
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENERS: INTERNAL://:9092,OUTSIDE://:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ES_JAVA_OPTS: "-Xms512m -Xmx3000m"
then KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 tell the Apache kafka that we Wanted to create only one replication and its work
Happy coding :)
Thanks & Regards
Jaiswar Vipin Kumar R.
Most helpful comment
Finally got the solution. I was doing on mistake. By default Apache Kafka create the 3 replication, this mean its by default create 3 broker as well. However aboe YAML create only one broker, and looknig for other 2 which is not created. hence we are getting the same error.
Fix
then
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1tell the Apache kafka that we Wanted to create only one replication and its workHappy coding :)
Thanks & Regards
Jaiswar Vipin Kumar R.