Kafka-node: (node) warning: possible EventEmitter memory leak detected. 11 -ready listeners added. Use emitter.setMaxListeners() to increase limit.

Created on 31 Jul 2018  路  7Comments  路  Source: SOHU-Co/kafka-node

Questions?

Getting this error after every 11 requests. I am posting a sample code to reproduce the same error.
I have removed events from the producer.js and added testkafka.js but still getting the same error.
Please do the needful.

Bug Report

Environment

Include Sample Code to reproduce the behavior

testKafka.js
var producer = require('producer.js');
var publisher = new producer({
"kafkaHost": "localhost:9092"
});
publisher.initProducer('high', (err) => {
if (err) {
L.critical('controller', 'Getting error in publisher configuration: ', err);
} else {
L.log('controller','publisher configured');
}
});

var payload = [{
topic: "test",
messages: "Hello"
}];

publisher.publishData(payload);
module.exports = testKafka

producer.js
/* jshint multistr: true, node: true*/
'use strict';

import kafka from "kafka-node";
import _ from "lodash";
import Q from "q";
import L from "lgr";
import CONFIG from "./config";

class Producer {
constructor(configParams) {
this.kafkaConfig = configParams ? configParams : {};

this.options = {};
this.options.kafkaHost = _.get(this.kafkaConfig, 'kafkaHost', null);
this.options.connectTimeout = _.get(this.kafkaConfig, 'connectTimeout', CONFIG.KAFKA_CONFIG.CONNECTION_TIMEOUT);
this.options.requestTimeout = _.get(this.kafkaConfig, 'requestTimeout', CONFIG.KAFKA_CONFIG.REQUEST_TIMEOUT);
this.options.autoConnect = _.get(this.kafkaConfig, 'autoConnect', CONFIG.KAFKA_CONFIG.AUTO_CONNECT);
this.options.connectRetryOptions = _.get(this.kafkaConfig, 'retryOptions', CONFIG.RETRY_OPTIONS);
this.options.maxAsyncRequests = _.get(this.kafkaConfig, 'maxAsyncRequests', CONFIG.KAFKA_CONFIG.MAX_ASYNC_REQUEST);
this.options.idleConnection = _.get(this.kafkaConfig, 'idleConnection', CONFIG.KAFKA_CONFIG.IDLE_CONNECTION);

this.producerOptions = {
    "requireAcks"   : _.get(this.kafkaConfig, 'requireAcks', CONFIG.PRODUCER_ACKS),
    "ackTimeoutMs"  : _.get(this.kafkaConfig, 'ackTimeoutMs', CONFIG.PRODUCER_ACK_TIMEOUT_MS)
};

if (this.options.kafkaHost === null) {
    L.error('Kafka Producer: ', CONFIG.MESSAGES.KAFKA_HOST_ERROR);
    throw new Error(CONFIG.MESSAGES.KAFKA_HOST_ERROR);
}

}

_initClient() {
this.client = this.client ? this.client: new kafka.KafkaClient(this.options);
}

_bindListeners(cb, deferred) {
this.producer.once('ready', () => {
L.log('Producer is ready! You can publish messages now.');
return this._prepareResponse(cb, deferred, null, CONFIG.MESSAGES.PRODUCER_READY);
});

this.producer.once('error', (err) => {
    L.error('Error in producer: ', err);
    return this._prepareResponse(cb, deferred, err, null);
});

this.producer.once('SIGTERM', function() {
    L.log('Shutdown producer!');
    return this._prepareResponse(cb, deferred, null, CONFIG.MESSAGES.PRODUCER_STOPPED);
});

}

_prepareResponse(cb, deferred, error, data) {
error ? deferred.reject(error) : deferred.resolve(data);

if(cb && cb !== 'undefined') {
    return error ? cb(error) : cb(null, data);
}

}

initProducer(producerType, cb) {
let deferred = Q.defer();
if (!(producerType === 'simple' || producerType === 'high')) {
L.error('initProducer: ', CONFIG.MESSAGES.PRODUCER_TYPE);
return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PRODUCER_TYPE, null);
}
this.producerType = producerType;

try {
    this._initClient();

    this.producer = (this.producerType === 'simple') ?
        new kafka.Producer(this.client, this.producerOptions) :
        new kafka.HighLevelProducer(this.client, this.producerOptions);
    return this._prepareResponse(cb, deferred, null, this.producer);
} catch (error) {
    L.error('initProducer: ', error);
    return this._prepareResponse(cb, deferred, error, null);
}

return deferred.promise;

}

publishData(payload, cb) {
let deferred = Q.defer();
if (this.producer === 'undefined' || !this.producer) {
return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PRODUCER_NOT_EXIST, null);
}

if (payload) {
    try {
        this.producer.send(payload, (err, data) => {
            return this._prepareResponse(cb, deferred, err, data);
        });
    } catch (err) {
        L.error('publishData: ', err);
        return this._prepareResponse(cb, deferred, err, null);
    }
} else {
    L.error('publishData: ', CONFIG.MESSAGES.PAYLOAD_NOT_EXIST);
    return this._prepareResponse(cb, deferred, CONFIG.MESSAGES.PAYLOAD_NOT_EXIST, null);
}
return deferred.promise;

}

}

module.exports = Producer;

Include output with Debug turned on

Thanks for your contribution!

Most helpful comment

@aikar I get this every time my producer is "ready" followed by TimeoutError: Request timed out after 30000ms when I try to send using the producer. Any fix?

All 7 comments

You can safely ignore this. It's on my TODO list to PR changing this to a Promise to get around triggering these warnings.

why is the listener limit reached in the first place ?

@aikar I get this every time my producer is "ready" followed by TimeoutError: Request timed out after 30000ms when I try to send using the producer. Any fix?

still experiencing it during producer on "ready" event

I'm having the same issue today, any updates?

@soulmachine for me just a workaround though not the best solution,

require('events').EventEmitter.defaultMaxListeners = Infinity;

and increase node memory limit

Still see this problem now.

Was this page helpful?
0 / 5 - 0 ratings