Does KafkaClient tolerate failure of broker(s) and try to connect other brokers ?
Hi team, I am using KafkaClient as client parameter of Producer, and I set kafkaHost to _"host1:port1,host2:port2,host3:port3"_ , it works when I send a message. Then, I kill the broker of host1:port1 and send message again via using the cached producer, it is stucked until request timeout, error message is _TimeoutError: Request timed out after 10000ms_. Then I use debug mode, and check the log of kafka-node, it always display kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1... will it try to connect to host2:port2 or host3:port3 that I provided ? Also, when I debug in client.js, it has a method called getBrokers, seems that it doesn't include all brokers I provided, sometimes only host1:port1, but sometimes [host1:port1, host2:port2] ?
var client = new kafka.KafkaClient({
kafkaHost: "host1:port1,host2:port2,host3:port3",
connectTimeout: 10000,
requestTimeout: 10000
});
producer = new kafka.Producer(client);
kafka-node:KafkaClient checking payload topic/partitions has leaders
kafka-node:KafkaClient found leaders for all
kafka-node:KafkaClient sending request
kafka-node:KafkaClient grouped requests by 1 brokers ["0"]
kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1
kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1
kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1
kafka-node:KafkaClient kafka-node-client reconnecting to host1:port1
....
TimeoutError: Request timed out after 10000ms
Best Regards,
Thanks!
KafkaClient will use your provided brokers to _discover_ the other brokers in the cluster. It does this by round robin connecting to it and doing a metadata request. Once successful it will use the official broker list from kafka and make connections to brokers it needs to connect to do it's job. So this may not be your entire cluster which explains why you only see connections leaders of the topic you're producing to. Hope that helps.
Thanks for replying me!
So the _kafkaHost_ I provided for KafkaClient is just for discovering the leader broker of topic in the cluster, is it correct? and in my understanding, it means that if I connect kafka via KafkaClient, it will not get the new leader broker to reconnect if a topic's leader broker failure? And BTW, it works if I connect kafka via Client which connect to zookeeper, if not all brokers failed, it will still works.
There's a mechanism in place that if a specific broker has issues it will do another metadata refresh to a different broker to obtain the new list of available brokers. Hopefully subsequent request to publish or fetch will be ok after doing this. Are you finding this is not working?
Hi, debug again, and found something.
If I kill the leader broker of a topic, it will be stuck, and KafkaClient would NOT try to refresh metadata. If I kill the broker which is not the leader of this topic, it still works. Then I divide into the code, in KafkaClient.js, from line:789 to line:801
KafkaClient.prototype.leaderLessPayloads = function (payloads) {
return _.filter(payloads, payload => !this.hasMetadata(payload.topic, payload.partition));
};
const leaderLessPayloads = this.leaderLessPayloads(payloads);
if (leaderLessPayloads.length === 0) {
logger.debug('found leaders for all');
return callback(null);
}
and in Client.js from line:598 to line:603
Client.prototype.hasMetadata = function (topic, partition) {
var brokerMetadata = this.brokerMetadata;
var leader = this.leaderByPartition(topic, partition);
return leader !== undefined && brokerMetadata[leader];
};
If I kill the leader of a topic, there still exist the leader broker in brokerMetadata of Client.js, so leaderLessPayloads.length === 0 is always satisfied, then it goes to reconnecting to the leader broker... I think kafka will elect a new leader of this topic if the old leader killed, isn't it? but why the broker metadata is not refreshed if brokers changed ?
Thanks!
What's more, I found that brokerMetadata is updated when brokersChanged event happened of zookeeper.
zk.on('brokersChanged', function (brokerMetadata) {
try {
self.brokerMetadata = brokerMetadata;
logger.debug('brokersChanged', brokerMetadata);
self.setupBrokerProfiles(brokerMetadata);
self.refreshBrokers();
// Emit after a 3 seconds
setTimeout(function () {
self.emit('brokersChanged');
}, 3000);
} catch (error) {
self.emit('error', error);
}
});
But if I use KafkaClient, it will not be triggered when I kill the broker.
Let me reproduce the issue:
3 Kafka Nodes: host1:port1,host2:port2,host3:port3
Test topic: topic1
Use KafkaClient, and 'host1:port1,host2:port2,host3:port3' as kafkaHost. host1:port1 is the leader broker of topic1.
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient compressing messages if needed
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient checking payload topic/partitions has leaders
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient found 1 connected broker(s)
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient found leaders for all
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient sending request
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient grouped requests by 1 brokers ["1"]
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient updating metadatas
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:Client refresh metadata currentAttempt 1
Thu, 26 Apr 2018 04:26:39 GMT kafka-node:KafkaClient updating metadatas ###### stuck 30s**
error: [Thu Apr 26 2018 12:27:09 GMT+0800 ][quattroweb][GCOTVMMW7121441][pid: 15980]: Failed to send message to Kafka: TimeoutError: Request timed out after 30000ms
I just want to know why my second send failed? it seems that KafkaClient tried to update metadata, but why it timeout? and as for third send, it works because broker/topic metadata has been updated.
Please advise. thanks a lot!!!
And one more issue found, if I restart my Kafka cluster, and send message again, it will response me a error: Kafka: Error: NotLeaderForPartition, I think these two are related.
In this case you will need to send the second request again. The issue here is the client's idea of who is the leader of that topic/partition has changed so that second request is no longer valid. Once metadata has been updated the client is able to send the produce request to the correct broker. Regarding the timeout-- TCP sockets tries to be very forgiving and tries to keep the connection going for a long time before a signal is sent to node. We implemented our own timeout for this case and you can configure a lower timeout if you'd like.
Hi, thanks for replying! However, when I debug into code, as for second request, it seems that KafkaClient had got the new broker and topic metadata, and topic鈥檚 new leader was configured in topic metadata, but why it doesn鈥檛 use the new leader? I had review KafkaClient code, it get metadata before sending, metadata has been updated, but it still use old metadata and trying to reconnect to killed leader, so that it need to wait until the killer broker wakes up, and then it request timeout... that鈥檚 what I found today... if I need to request again, does it mean that I need to retry in my own code if it failed? However, it would be better if it resolved in Kafka-node...
How did the client get the new broker metadata before sending the second request? KafkaClient only requests metadata if the connection to the broker is broken or if it doesn't have metadata about a topic/partition it's trying to fetch which shouldn't be the case here since the first request went through. If you have more details please attach them.
oh I see, so it will timeout if the leader broker of topic is broken, and now I have add my own retry logic to send again if it timeout. I have suggestion: could KafkaClient add a option to retry if it timeout in case it caused by leader broker broken (new leader was elected and new topic/broker metadata can be fetch)?, so no exception will raised, it will not end up my process and I don't need to add my own retry feature, if I get time, I will try to achieve it.
Thanks!
We leave it up to the user of the module to determine when they should retry requests or give up. It's difficult for the module to do this because it could make a wrong decision which could lead to messages being produced in the wrong order. Order can be very important for certain use cases.
@hyperlink I think there might actually be a problem reconnecting on version 2.6.1.
I created a TestTopic with 1 partition and replication factor 3.
The producer produces a message at a regular interval. When I kill the leader, the producer isn't able to recover, even in subsequent messages. From what I observed, the client didn't round robin on the initial broker list to reconnect.
In the test consumer, when I kill two of the three brokers, it is unable to use the third broker to connect.
Producing message, to topic TestTopic
kafka-node:KafkaClient compressing messages if needed +10s
kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
kafka-node:KafkaClient found leaders for all +0ms
kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +1ms
kafka-node:KafkaClient has apiSupport broker is ready +1ms
kafka-node:KafkaClient Using V2 of produce +36ms
message written back to Kafka {"TestTopic":{"0":126}}
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +8s
kafka-node:KafkaClient Sending versions request to kb1.local.thsp.tech:19092 +4ms
kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +7ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +993ms
Producing message, to topic TestTopic
kafka-node:KafkaClient compressing messages if needed +840ms
kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
kafka-node:KafkaClient found leaders for all +0ms
kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +0ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +181ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
Producing message, to topic TestTopic
kafka-node:KafkaClient compressing messages if needed +691ms
kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
kafka-node:KafkaClient found leaders for all +1ms
kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +0ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +317ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +1s
kafka-node:KafkaClient longpolling socket [BrokerWrapper kb2.local.thsp.tech:29092 (connected: true) (ready: true) (idle: false)] is waiting +0ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +1s
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +2ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +13ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +2ms
kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +1ms
kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +351ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +2ms
kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +145ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +0ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +499ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +43ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +2ms
kafka-node:KafkaClient ApiVersions failed with unexpected error { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
kafka-node:KafkaClient error initialize broker after connect { BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Socket.<anonymous> (/home/thiagopinto/workspace/test-kafka-libs/js/node_modules/kafka-node/lib/kafkaClient.js:630:43)
at emitOne (events.js:96:13)
at Socket.emit (events.js:188:7)
at TCP._handle.close [as _onclose] (net.js:497:12) message: 'Broker not available' } +2ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +8ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +304ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +13ms
kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +188ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +0ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +485ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +10ms
kafka-node:KafkaClient ApiVersions failed with unexpected error { Error: read ECONNRESET
at exports._errnoException (util.js:1020:11)
at TCP.onread (net.js:568:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' } +3ms
kafka-node:KafkaClient error initialize broker after connect { Error: read ECONNRESET
at exports._errnoException (util.js:1020:11)
at TCP.onread (net.js:568:26) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' } +3ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +1ms
kafka-node:KafkaClient Sending versions request to kb2.local.thsp.tech:29092 +11ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +306ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb1.local.thsp.tech:19092 +3ms
kafka-node:KafkaClient getApiVersions request timedout probably less than 0.10 using base support +192ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":1,"usable":1},"fetch":{"min":0,"max":1,"usable":1},"offset":{"min":0,"max":0,"usable":0},"metadata":{"min":0,"max":0,"usable":0},"leader":null,"stopReplica":null,"updateMetadata":null,"controlledShutdown":null,"offsetCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":1,"usable":1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":0,"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max":0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":0,"max":0,"usable":0},"listGroups":{"min":0,"max":0,"usable":0},"saslHandshake":null,"apiVersions":{"min":0,"max":0,"usable":0},"createTopics":null,"deleteTopics":null} +1ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +489ms
kafka-node:KafkaClient kafka-node-client reconnecting to kb2.local.thsp.tech:29092 +13ms
+1
Saw the exact same problem as reported by thspinto on kafka-node 2.5.0 and also seeing the same error on 2.6.1
Scenario: 3 brokers running on localhost:9092, 9093, 9094.
Using ConsumerGroup.
When started both publisher and consumer are running fine.
The topics are assigned to Leader:0
Forcibly terminate broker id 0.
Sample Consumer Output:
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient checking payload topic/partitions has leaders
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient found leaders for all
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient grouped requests by 1 brokers ["0"]
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient has apiSupport broker is ready
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient Using V2 of fetch
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient checking payload topic/partitions has leaders
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient found leaders for all
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient grouped requests by 1 brokers ["0"]
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient has apiSupport broker is ready
Tue, 05 Jun 2018 23:58:02 GMT kafka-node:KafkaClient Using V2 of fetch
Killed broker:0 here. The I see...
Tue, 05 Jun 2018 23:58:16 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
Tue, 05 Jun 2018 23:58:16 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
Tue, 05 Jun 2018 23:58:17 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
Tue, 05 Jun 2018 23:58:17 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
Tue, 05 Jun 2018 23:58:18 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
It never fails over to the other brokers.
If I restart broker:0 - the KafkaClient reconnects and continues as shown below:
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient Sending versions request to localhost:9092
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient Sending versions request to localhost:9092
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient Received versions response from localhost:9092
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}}
Wed, 06 Jun 2018 00:01:30 GMT kafka-node:KafkaClient Received versions response from localhost:9092
Similarly the publisher gets stuck and never fails over. But when the broker is started - it connects and continues.
I have a similar issue. It seems like it connects and receives metadata and then just gets stuck.
ri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient Connect attempt 1
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient Trying to connect to host: 10.0.2.2 port: 9092
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient Sending versions request to 10.0.2.2:9092
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient broker socket connected {"host":"10.0.2.2","port":"9092"}
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient Received versions response from 10.0.2.2:9092
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"38":{"min":0,"max":0,"usable":false},"39":{"min":0,"max":0,"usable":false},"40":{"min":0,"max":0,"usable":false},"41":{"min":0,"max":0,"usable":false},"42":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":7,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}}
Fri, 15 Jun 2018 14:08:19 GMT kafka-node:KafkaClient updating metadatas
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:ProducerStream _write
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:KafkaClient compressing messages if needed
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:KafkaClient checking payload topic/partitions has leaders
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:KafkaClient found leaders for all
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:KafkaClient grouped requests by 1 brokers ["0"]
Fri, 15 Jun 2018 14:08:20 GMT kafka-node:KafkaClient missing apiSupport waiting until broker is ready...
Fri, 15 Jun 2018 14:08:21 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:22 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:23 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:24 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:25 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:26 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:27 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:28 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:29 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:30 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:31 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:33 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:34 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:35 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:36 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:37 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:38 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:39 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:40 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:41 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:42 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:43 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:44 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:45 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:46 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:47 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:48 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:49 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
Fri, 15 Jun 2018 14:08:50 GMT kafka-node:KafkaClient kafka-node-client reconnecting to wilson:9092
events.js:167
throw er; // Unhandled 'error' event
^
TimeoutError: Request timed out after 30000ms
at new TimeoutError (/app/node_modules/kafka-node/lib/errors/TimeoutError.js:6:9)
at Timeout.setTimeout [as _onTimeout] (/app/node_modules/kafka-node/lib/kafkaClient.js:760:14)
at ontimeout (timers.js:427:11)
at tryOnTimeout (timers.js:289:5)
at listOnTimeout (timers.js:252:5)
at Timer.processTimers (timers.js:212:10)
Emitted 'error' event at:
at ProducerStream.onerror (/app/node_modules/readable-stream/lib/_stream_readable.js:640:52)
at ProducerStream.emit (events.js:182:13)
at onwriteError (_stream_writable.js:431:12)
at onwrite (_stream_writable.js:456:5)
at async.series (/app/node_modules/kafka-node/lib/kafkaClient.js:937:9)
at /app/node_modules/async/dist/async.js:3888:9
at /app/node_modules/async/dist/async.js:473:16
at iterateeCallback (/app/node_modules/async/dist/async.js:988:17)
at /app/node_modules/async/dist/async.js:969:16
at /app/node_modules/async/dist/async.js:3885:13
Most helpful comment
We leave it up to the user of the module to determine when they should retry requests or give up. It's difficult for the module to do this because it could make a wrong decision which could lead to messages being produced in the wrong order. Order can be very important for certain use cases.