I am trying to connect to an aws managed kafka instance, using ProducerStream. It refuses to connect to the kafkahost and gives me following error:
Producer error { Error: connect ECONNREFUSED 127.0.0.1:9092
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)
errno: 'ECONNREFUSED',
code: 'ECONNREFUSED',
syscall: 'connect',
address: '127.0.0.1',
port: 9092 }
Although I am passing kafkaHost parameter to the kafka client .
consumerStream is working perfectly with the same kafka client configuration.
const kafka = require('kafka-node')
const { Transform } = require('stream');
const _ = require('lodash');
const client = new kafka.KafkaClient({ kafkaHost: '<remoteip>:9092' })
, producer = new kafka.ProducerStream({kafkaClient: client});
const stdinTransform = new Transform({
objectMode: true,
decodeStrings: true,
transform (text, encoding, callback) {
let num = parseInt(text);
console.log(`Pushing message ${text} to ExampleTopic`);
let message = { num: num, method: 'two' }
callback(null, {
topic: 'log',
messages: JSON.stringify(message)
});
}
});
process.stdin.setEncoding('utf8');
process.stdin.pipe(stdinTransform).pipe(producer);
However , if i go to kafka-node/lib/producerStream.js and change the following configration
const DEFAULTS = {
kafkaClient: {
kafkaHost: '127.0.0.1:9092'
},
producer: {
partitionerType: 3
}
};
to:
const DEFAULTS = {
kafkaClient: {
kafkaHost: '<remoteip>:9092'
},
producer: {
partitionerType: 3
}
};
the producerStream works perfectly. Let me know if I am missing anything!
You don't pass an instantiated KafkaClient to the ProducerStream instead you pass the client's config
options to it and it will instantiate an instance of KafkaClient for you.
Shouldn't this be same across all API's.
Producer needs client
Consumer needs client
ConsumerStream needs client
ProducerStream needs options
It's not a good idea to share clients since there are state that's saved on the client object. Users are also more likely to share them when they are instantiated separately. Making it a config makes the API simpler and keep users from getting into trouble.
For anyone that this can help:
The ProducerStream constructor takes a kafkaClient property that receives the KafkaClientOptions instead of a client. (IMHO The naming is a bit confusing).
const kafkaClientOptions = { kafkaHost: 'localhost:9092' };
const producer = new ProducerStream({ kafkaClient: kafkaClientOptions });
Most helpful comment
Shouldn't this be same across all API's.
ProducerneedsclientConsumerneedsclientConsumerStreamneedsclientProducerStreamneedsoptions