Kafka-node: ProducerStream picking default kafkahost

Created on 18 Feb 2019  路  5Comments  路  Source: SOHU-Co/kafka-node

Bug Report

I am trying to connect to an aws managed kafka instance, using ProducerStream. It refuses to connect to the kafkahost and gives me following error:

Producer error { Error: connect ECONNREFUSED 127.0.0.1:9092
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1113:14)
  errno: 'ECONNREFUSED',
  code: 'ECONNREFUSED',
  syscall: 'connect',
  address: '127.0.0.1',
  port: 9092 }

Although I am passing kafkaHost parameter to the kafka client .
consumerStream is working perfectly with the same kafka client configuration.

Environment

  • Node version: 11.10.0
  • Kafka-node version: 4.0.1
  • Kafka version: 1.1.1

For specific cases also provide

  • Number of Brokers:3
  • Number partitions for topic: 1

Include Sample Code to reproduce behavior

const kafka = require('kafka-node')
const { Transform } = require('stream');
const _ = require('lodash'); 
const client = new kafka.KafkaClient({ kafkaHost: '<remoteip>:9092' })
    , producer = new kafka.ProducerStream({kafkaClient: client});

const stdinTransform = new Transform({
  objectMode: true,
  decodeStrings: true,
  transform (text, encoding, callback) {
    let num = parseInt(text);
    console.log(`Pushing message ${text} to ExampleTopic`);
    let message = { num: num, method: 'two' }
    callback(null, {
      topic: 'log',
      messages: JSON.stringify(message)
    });
  }
});



process.stdin.setEncoding('utf8');
process.stdin.pipe(stdinTransform).pipe(producer);

Most helpful comment

Shouldn't this be same across all API's.
Producer needs client
Consumer needs client
ConsumerStream needs client
ProducerStream needs options

All 5 comments

However , if i go to kafka-node/lib/producerStream.js and change the following configration

const DEFAULTS = {
  kafkaClient: {
    kafkaHost: '127.0.0.1:9092'
  },
  producer: {
    partitionerType: 3
  }
};

to:

const DEFAULTS = {
  kafkaClient: {
    kafkaHost: '<remoteip>:9092'
  },
  producer: {
    partitionerType: 3
  }
};

the producerStream works perfectly. Let me know if I am missing anything!

You don't pass an instantiated KafkaClient to the ProducerStream instead you pass the client's config
options to it and it will instantiate an instance of KafkaClient for you.

Shouldn't this be same across all API's.
Producer needs client
Consumer needs client
ConsumerStream needs client
ProducerStream needs options

It's not a good idea to share clients since there are state that's saved on the client object. Users are also more likely to share them when they are instantiated separately. Making it a config makes the API simpler and keep users from getting into trouble.

For anyone that this can help:
The ProducerStream constructor takes a kafkaClient property that receives the KafkaClientOptions instead of a client. (IMHO The naming is a bit confusing).

const kafkaClientOptions = { kafkaHost: 'localhost:9092' };

const producer = new ProducerStream({ kafkaClient: kafkaClientOptions });
Was this page helpful?
0 / 5 - 0 ratings