Getting this error after every 11 requests. I am posting a sample code to reproduce the same error.
I have removed events from the producer.js and added testkafka.js but still getting the same error.
Please do the needful.
testKafka.js
var producer = require('producer.js');
var publisher = new producer({
"kafkaHost": "localhost:9092"
});
publisher.initProducer('high', (err) => {
if (err) {
L.critical('controller', 'Getting error in publisher configuration: ', err);
} else {
L.log('controller','publisher configured');
}
});
var payload = [{
topic: "test",
messages: "Hello"
}];
publisher.publishData(payload);
module.exports = testKafka
producer.js
/* jshint multistr: true, node: true*/
'use strict';
import kafka from "kafka-node";
import _ from "lodash";
import Q from "q";
import L from "lgr";
import CONFIG from "./config";
class Producer {
constructor(configParams) {
this.kafkaConfig = configParams ? configParams : {};
this.options = {};
this.options.kafkaHost = _.get(this.kafkaConfig, 'kafkaHost', null);
this.options.connectTimeout = _.get(this.kafkaConfig, 'connectTimeout', CONFIG.KAFKA_CONFIG.CONNECTION_TIMEOUT);
this.options.requestTimeout = _.get(this.kafkaConfig, 'requestTimeout', CONFIG.KAFKA_CONFIG.REQUEST_TIMEOUT);
this.options.autoConnect = _.get(this.kafkaConfig, 'autoConnect', CONFIG.KAFKA_CONFIG.AUTO_CONNECT);
this.options.connectRetryOptions = _.get(this.kafkaConfig, 'retryOptions', CONFIG.RETRY_OPTIONS);
this.options.maxAsyncRequests = _.get(this.kafkaConfig, 'maxAsyncRequests', CONFIG.KAFKA_CONFIG.MAX_ASYNC_REQUEST);
this.options.idleConnection = _.get(this.kafkaConfig, 'idleConnection', CONFIG.KAFKA_CONFIG.IDLE_CONNECTION);
this.producerOptions = {
"requireAcks" : _.get(this.kafkaConfig, 'requireAcks', CONFIG.PRODUCER_ACKS),
"ackTimeoutMs" : _.get(this.kafkaConfig, 'ackTimeoutMs', CONFIG.PRODUCER_ACK_TIMEOUT_MS)
};
if (this.options.kafkaHost === null) {
L.error('Kafka Producer: ', CONFIG.MESSAGES.KAFKA_HOST_ERROR);
throw new Error(CONFIG.MESSAGES.KAFKA_HOST_ERROR);
}
}
_initClient() {
this.client = this.client ? this.client: new kafka.KafkaClient(this.options);
}
_bindListeners(cb, deferred) {
this.producer.once('ready', () => {
L.log('Producer is ready! You can publish messages now.');
return this._prepareResponse(cb, deferred, null, CONFIG.MESSAGES.PRODUCER_READY);
});
this.producer.once('error', (err) => {
L.error('Error in producer: ', err);
return this._prepareResponse(cb, deferred, err, null);
});
this.producer.once('SIGTERM', function() {
L.log('Shutdown producer!');
return this._prepareResponse(cb, deferred, null, CONFIG.MESSAGES.PRODUCER_STOPPED);
});
}
_prepareResponse(cb, deferred, error, data) {
error ? deferred.reject(error) : deferred.resolve(data);
if(cb && cb !== 'undefined') {
return error ? cb(error) : cb(null, data);
}
}
initProducer(producerType, cb) {
let deferred = Q.defer();
if (!(producerType === 'simple' || producerType === 'high')) {
L.error('initProducer: ', CONFIG.MESSAGES.PRODUCER_TYPE);
return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PRODUCER_TYPE, null);
}
this.producerType = producerType;
try {
this._initClient();
this.producer = (this.producerType === 'simple') ?
new kafka.Producer(this.client, this.producerOptions) :
new kafka.HighLevelProducer(this.client, this.producerOptions);
return this._prepareResponse(cb, deferred, null, this.producer);
} catch (error) {
L.error('initProducer: ', error);
return this._prepareResponse(cb, deferred, error, null);
}
return deferred.promise;
}
publishData(payload, cb) {
let deferred = Q.defer();
if (this.producer === 'undefined' || !this.producer) {
return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PRODUCER_NOT_EXIST, null);
}
if (payload) {
try {
this.producer.send(payload, (err, data) => {
return this._prepareResponse(cb, deferred, err, data);
});
} catch (err) {
L.error('publishData: ', err);
return this._prepareResponse(cb, deferred, err, null);
}
} else {
L.error('publishData: ', CONFIG.MESSAGES.PAYLOAD_NOT_EXIST);
return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PAYLOAD_NOT_EXIST, null);
}
return deferred.promise;
}
}
module.exports = Producer;
Thanks for your contribution!
You can safely ignore this. It's on my TODO list to PR changing this to a Promise to get around triggering these warnings.
why is the listener limit reached in the first place ?
@aikar I get this every time my producer is "ready" followed by TimeoutError: Request timed out after 30000ms when I try to send using the producer. Any fix?
still experiencing it during producer on "ready" event
I'm having the same issue today, any updates?
@soulmachine for me just a workaround though not the best solution,
require('events').EventEmitter.defaultMaxListeners = Infinity;
and increase node memory limit
Still see this problem now.
Most helpful comment
@aikar I get this every time my producer is "ready" followed by
TimeoutError: Request timed out after 30000mswhen I try to send using the producer. Any fix?