Kafka-node: Will the producer send a message if I do not wait for the ready event?

Created on 12 May 2015  路  6Comments  路  Source: SOHU-Co/kafka-node

At the time I am wanting to send a payload to the queue, I'm not sure if the ready event has already fired (thus attaching on('ready') would not fire the callback) or not. If I call producer.send(payload) without wrapping it in producer.on('ready', function () { ... });, will the producer throw errors or will it send the payload when ready?

This is just a question, not an issue, so I apologize if this is posted in the wrong place.

If I do have to listen for the ready event, I can probably move some code around and keep track of the ready state in a private variable.

Most helpful comment

Oops, forgot to respond here.

I took @mmiller42 example and adjusted it to the promise approach.

var producer = new kafka.Producer(...);

var whenProducerReady = new Promise(function (resolve, reject) {
  producer.on('ready', function () {
    resolve(producer);
  });
})

function sendToProducer (payloads, cb) {
  whenProducerReady
    .then(function (producer) {
      producer.send(payloads, cb);  
    });
}

sendToProducer([ { topic: 'whatever', messages: [ 'message' ] } ], function (err) {
  if (err) throw err;
  console.log('Sent payloads to producer!');
});

All 6 comments

For our usecase we had to wrap it into a Promise (ES6 / Bluebird) which resolves on ready.
For calls beforehand we just collect the payloads & as soon as the Promise got resolved the collection gets transmitted.

From this point on everything is sent instantly (but still through the Promise)

@mmiller42 yes, the message should be sent only when producer is ready.

I'm a novice with node.js, event state management, and promises, so can someone:

  • elaborate how to track/manage the ready state of producer if not using Promises, for what mmiller42 described, as maybe the state is already "ready" when I'm trying to produce. It's not working right now for me.
  • zcei, could you provide sample code on how you handle this with the Promises approach, with the collecting payload in the meantime before Promise is resolved?

@daluu I'm currently not using promises (though I probably should be), so implementing it is really this simple for me:

  • when the producer is ready, I save its ready state as a flag
  • whenever sending a message, I check if the producer is already ready; if so, I send immediately; else, I attach it to the ready event

Here's the simplest example of my implementation:

var producer = new kafka.Producer(...);
var producerReady = false;
producer.on('ready', function () {
  producerReady = true;
});

function sendToProducer (payloads, cb) {
  if (producerReady) {
    producer.send(payloads, cb);
  } else {
    producer.on('ready', function () {
      producer.send(payloads, cb);
    });
  }
}

sendToProducer([ { topic: 'whatever', messages: [ 'message' ] } ], function (err) {
  if (err) throw err;
  console.log('Sent payloads to producer!');
});

I'll give a more elaborate example tomorrow at work, but in the meantime you can try to get some hints from code here:

Isn't this clean, though - will try to point out the basic concept!

Oops, forgot to respond here.

I took @mmiller42 example and adjusted it to the promise approach.

var producer = new kafka.Producer(...);

var whenProducerReady = new Promise(function (resolve, reject) {
  producer.on('ready', function () {
    resolve(producer);
  });
})

function sendToProducer (payloads, cb) {
  whenProducerReady
    .then(function (producer) {
      producer.send(payloads, cb);  
    });
}

sendToProducer([ { topic: 'whatever', messages: [ 'message' ] } ], function (err) {
  if (err) throw err;
  console.log('Sent payloads to producer!');
});
Was this page helpful?
0 / 5 - 0 ratings

Related issues

AnnisaNurika picture AnnisaNurika  路  5Comments

sergeyjsg picture sergeyjsg  路  4Comments

quorak picture quorak  路  5Comments

yusufameri picture yusufameri  路  6Comments

cesaraugustogarcia picture cesaraugustogarcia  路  3Comments