NestedError: refreshBrokerMetadata failed, Caused By: Error: Unable to find available brokers to try
new kafka.HighLevelProducer(new kafka.KafkaClient(), {
// Configuration for when to consider a message as acknowledged, default 1
requireAcks: 1,
// The amount of time in milliseconds to wait for all acks before considered, default 100ms
ackTimeoutMs: 100,
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
// partitionerType: 2
})
This error often occurs, what is the cause?
I'm seeing it too.
Errors
{ NestedError: refreshBrokerMetadata failed
at async.waterfall (/node_modules/kafka-node/lib/kafkaClient.js:378:35)
at /node_modules/async/dist/async.js:473:16
at /node_modules/async/dist/async.js:5329:29)
at /node_modules/async/dist/async.js:969:16
I think I found a fix for this. It's not pretty, but it seems to work.
@rorysavage77 What is the fix?
@rorysavage77 what is your fix?
@rorysavage77 What is your fix?
@rorysavage77 that fix?
I wonder why this issue is closed? I'm experiencing it and there doesn't seem to be any good fix or work around?
Sorry for the late response, here is my patch for kafkaClient.js
--- kafkaClient.js 2020-07-28 13:05:28.000000000 -0400
+++ kafkaClient.js-new 2020-08-03 12:12:17.000000000 -0400
@@ -776,12 +776,12 @@
error = new errors.SaslAuthenticationError(null, message);
} else {
error = new errors.BrokerNotAvailableError('Broker not available (socket closed)');
- //if (!self.connecting && !brokerWrapper.isIdle()) {
- //logger.debug(`${self.clientId} schedule refreshBrokerMetadata()`);
- //setImmediate(function () {
- //self.refreshBrokerMetadata();
- //});
- //}
+ if (!self.connecting && !brokerWrapper.isIdle()) {
+ logger.debug(`${self.clientId} schedule refreshBrokerMetadata()`);
+ setImmediate(function () {
+ self.refreshBrokerMetadata();
+ });
+ }
}
}
self.clearCallbackQueue(this, error);
@@ -893,7 +893,7 @@
const ensureBrokerReady = (broker, cb) => {
if (!broker.isReady()) {
- logger.debug('missing apiSupport waiting until broker is ready...');
+ logger.debug('missing apiSupport waiting until broker is ready...(loadMetadataForTopics)');
this.waitUntilReady(broker, cb);
} else {
cb(null);
@@ -906,20 +906,13 @@
ensureBrokerReady(broker, cb);
},
cb => {
- /* Additional patching suggested by: https://github.com/SOHU-Co/kafka-node/issues/995 */
- try {
- const broker = this.brokerForLeader();
- const correlationId = this.nextId();
- const supportedCoders = getSupportedForRequestType(broker, 'metadata');
- const request = supportedCoders.encoder(this.clientId, correlationId, topics);
-
- this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
- broker.write(request);
- } catch (err) {
- callback(err);
- }
-
+ const broker = this.brokerForLeader();
+ const correlationId = this.nextId();
+ const supportedCoders = getSupportedForRequestType(broker, 'metadata');
+ const request = supportedCoders.encoder(this.clientId, correlationId, topics);
+ this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
+ broker.write(request);
}
],
(err, result) => {
@@ -1000,19 +993,25 @@
const onReady = () => {
logger.debug('broker is now ready');
- this._clearTimeout(timeoutId);
- timeoutId = null;
+
+ if (timeoutId !== null) {
+ this._clearTimeout(timeoutId);
+ timeoutId = null;
+ }
+
callback(null);
};
const timeout = this.options.requestTimeout;
const readyEventName = broker.getReadyEventName();
- timeoutId = this._createTimeout(() => {
- this.removeListener(readyEventName, onReady);
- this._timeouts.delete(timeoutId);
- callback(new TimeoutError(`Request timed out after ${timeout}ms`));
- }, timeout);
+ if (timeout !== false) {
+ timeoutId = this._createTimeout(() => {
+ this.removeListener(readyEventName, onReady);
+ this._timeouts.delete(timeoutId);
+ callback(new TimeoutError(`Request timed out after ${timeout}ms`));
+ }, timeout);
+ }
this.once(readyEventName, onReady);
};
@@ -1081,8 +1080,13 @@
const ensureBrokerReady = async.ensureAsync((leader, callback) => {
const broker = this.brokerForLeader(leader, longpolling);
+ if (!broker.isConnected()) {
+ this.refreshBrokerMetadata();
+ callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest -> ensureBrokerReady)'));
+ return;
+ }
if (!broker.isReady()) {
- logger.debug('missing apiSupport waiting until broker is ready...');
+ logger.debug(`missing apiSupport waiting until broker is ready... (sendRequest ${request.type})`);
this.waitUntilReady(broker, callback);
} else {
callback(null);
@@ -1098,14 +1102,7 @@
ensureBrokerReady(leader, callback);
},
function (callback) {
- /* Updated sendToBroker call per:
- Suggestion from: https://github.com/SOHU-Co/kafka-node/issues/995
- */
- try {
- sendToBroker(payload, leader, callback);
- } catch (ex) {
- callback(ex);
- }
+ sendToBroker(payload, leader, callback);
}
],
function (error, results) {
@@ -1387,6 +1384,12 @@
return callback(new Error('Client is not ready (describeConfigs)'));
}
let err;
+
+ // Broker resource requests must go to the specific node
+ // other requests can go to any node
+ const brokerResourceRequests = [];
+ const nonBrokerResourceRequests = [];
+
_.forEach(payload.resources, function (resource) {
if (resourceTypeMap[resource.resourceType] === undefined) {
err = new Error(`Unexpected resource type ${resource.resourceType} for resource ${resource.resourceName}`);
@@ -1394,39 +1397,47 @@
} else {
resource.resourceType = resourceTypeMap[resource.resourceType];
}
+
+ if (resource.resourceType === resourceTypeMap['broker']) {
+ brokerResourceRequests.push(resource);
+ } else {
+ nonBrokerResourceRequests.push(resource);
+ }
});
+
if (err) {
return callback(err);
}
- const brokers = this.brokerMetadata;
- async.mapValuesLimit(
- brokers,
- this.options.maxAsyncRequests,
- (brokerMetadata, brokerId, cb) => {
- const broker = this.brokerForLeader(brokerId);
- if (!broker || !broker.isConnected()) {
- return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)'));
- }
-
- const correlationId = this.nextId();
- let apiVersion = 0;
- if (broker.apiSupport && broker.apiSupport.describeConfigs) {
- apiVersion = broker.apiSupport.describeConfigs.max;
- }
- apiVersion = Math.min(apiVersion, 2);
- const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion);
- this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb);
- },
- (err, results) => {
- if (err) {
- callback(err);
- return;
+ async.parallelLimit([
+ (cb) => {
+ if (nonBrokerResourceRequests.length > 0) {
+ this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb);
+ } else {
+ cb(null, []);
}
- results = _.values(results);
- callback(null, _.merge.apply({}, results));
+ },
+ ...brokerResourceRequests.map(r => {
+ return (cb) => {
+ this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb);
+ };
+ })
+ ], this.options.maxAsyncRequests, (err, result) => {
+ if (err) {
+ return callback(err);
}
- );
+
+ callback(null, _.flatten(result));
+ });
+};
+
+/**
+ * Sends a request to any broker in the cluster
+ */
+KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) {
+ // For now just select the first broker
+ const brokerId = Object.keys(this.brokerMetadata)[0];
+ this.sendRequestToBroker(brokerId, requestType, args, callback);
};
module.exports = KafkaClient;
@rorysavage77 Can you please send your full kafkaClient.js file?
Any idea why this start happening even with older versions of Kafka-node ?
any update on this?
Most helpful comment
I wonder why this issue is closed? I'm experiencing it and there doesn't seem to be any good fix or work around?