Kafka-node: I'm able to connect to kafka, but producer send gives me timeout

Created on 28 May 2019  路  4Comments  路  Source: SOHU-Co/kafka-node

I was trying to write a Winston Transport that sends every log of my application to Kafka.
The following code is what i'm using

Kafka client is instantiated with the following code.

const logger = createLogger({
  transports: [
    new KafkaTransport({
      kafkaHost: process.env.KAFKA_HOST || 'localhost:9092',
    })
  ]
});

And the code of Winston Transport is:

const { KafkaClient, HighLevelProducer, Consumer } = require('kafka-node')
const Transport = require('winston-transport')

module.exports = class KafkaTransport extends Transport {
  constructor(options) {
    super(options);

    if(!options.kafkaHost)
      throw new Error('The property options.kafkaHost is required')

    options = options || {}

    this.connect(options)    
  }

  connect(options) {
    this.client = new KafkaClient(options)

    this.producer = new HighLevelProducer(this.client, {
      requireAcks: 1,
      ackTimeoutMs: 100
    });

    this.client.on('error', (error, teste, teste2) => {
      throw new Error('Client connection error')
    });

    this.producer.on('error', (error) => {            
      throw new Error('Producer connection error')
    });  

    global.kafka.producer.on('ready', () => {
      console.log('KAFKA Connected')      
    });
  }

  getStatus() {
    let clientReady = (this.client && this.client.ready)
    let producerReady = (this.producer && this.producer.ready)

    if(!clientReady || !producerReady) {
      return false
    }

    return true
  }

  sendMessage(payload) {
    global.kafka.producer.send(payload, function (error, result) {
      if (error) {
        throw new Error('Error sending message')
      } else {        
        console.log('[KAFKA] MESSAGE SENT', result)
      }
    });
  }

  log(info, callback) {
    const payload = [{
      topic: 'teste topic',
      messages: [info],
      attributes: 1,
      timestamp: new Date().getTime()
    }]

    if(this.getStatus(global.kafka)) {
      this.sendMessage(payload)
    } else {
      this.connect()
      this.sendMessage(payload)
    }

    if(callback)
        callback();
  }
};

The problem that i'm facing now is, the client emits that is connected (ready) but can't send any message.

{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (/opt/app/node_modules/kafka-node/lib/errors/TimeoutError.js:6:9)
at Timeout.timeoutId._createTimeout [as _onTimeout] (/opt/app/node_modules/kafka-node/lib/kafkaClient.js:1007:14)
at ontimeout (timers.js:475:11)
at tryOnTimeout (timers.js:310:5)
at Timer.listOnTimeout (timers.js:270:5) message: 'Request timed out after 30000ms' }

My kafka is installed on Openshift and what i tried so far was:

Connecting direct to container with kafka-console-producer e kafka-console-consumer and both works as expected. Which means zookeeper is online and working.

I have been able to create a topic using code above, but i haved remove this parte, but for consideration, i am capable to estabilish a connection to kafka servers, my only problem is send message itself.

Any ideas?

Thanks in advance

Most helpful comment

I have been able to solve the problem.

For further help, i will describe it here:

We use Openshift here and the problem it was i had not set advertise.host.name, and reading docs from kafka, if i do not supply this information, he will use whatever returns on getCanonicalHostName() from java, which is the pod name.

Once in our openshift you cannot connect directly to pod, you have to use service names, thats the why i was receiving timeouts... Once i setted the advertise.host.name to my service name, the kafka starts to reply as expected.

Thanks for everthing, and i will close this issue now.

All 4 comments

Turn on debugging and maybe it will give some more clues.

I have been able to solve the problem.

For further help, i will describe it here:

We use Openshift here and the problem it was i had not set advertise.host.name, and reading docs from kafka, if i do not supply this information, he will use whatever returns on getCanonicalHostName() from java, which is the pod name.

Once in our openshift you cannot connect directly to pod, you have to use service names, thats the why i was receiving timeouts... Once i setted the advertise.host.name to my service name, the kafka starts to reply as expected.

Thanks for everthing, and i will close this issue now.

You need start the Kafka servir first and then start your application! It works for me.

Hi Kobuti, I am receiving same error, when using custom node.js producer/consumer kafkaclient for OCP kafka cluster. Where as it runs fine with console producer and consumer. There was a mention that you could solve it using advertise.host.name, can u please elaborate how it is done. Thanks.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

muradm picture muradm  路  5Comments

aamitsharma2705 picture aamitsharma2705  路  4Comments

comrat picture comrat  路  5Comments

nithjino picture nithjino  路  3Comments

cheungwsj picture cheungwsj  路  5Comments