[2017-11-25 09:34:41.065] [WARN] info - { Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }
[2017-11-25 09:34:42.355] [WARN] info - { Error: Local: Broker transport failure
origin: 'local',
message: 'broker transport failure',
code: -1,
errno: -1,
stack: 'Error: Local: Broker transport failure' }
My node-rdkafka consuming code:
const Kafka = require('node-rdkafka')
const log = require('./log')
let env = process.env.NODE_ENV || 'development'
let stream = Kafka.KafkaConsumer.createReadStream(
{
'metadata.broker.list': 'logkfk1:19092,logkfk2:19092',
'group.id': 'node-consumer-production2',
'socket.keepalive.enable': true,
'enable.auto.commit': false
},
{
'auto.offset.reset': 'earliest'
},
{
'topics': ['userlog']
}
)
let counter = 0
let numMessages = 10
stream.on('data', async function (m) {
stream.pause()
counter++
try {
let userLog = JSON.parse(m.value)
let actType = userLog.ul_actType
let targetId = userLog.ul_targetId
let addTime = userLog.ul_addTime
let userId = userLog.ul_userId
if (actType === 1000 || actType === 1010 || actType === 1030 || actType === 2000) {
await log.logArticle(targetId, actType, addTime, userId)
}
if (counter % numMessages === 0) {
stream.consumer.commit()
}
} catch (error) {
__logger.error(error.message)
} finally {
stream.resume()
}
})
stream.on('error', function (err) {
__logger.error(err)
process.exit(1)
})
stream.consumer.on('event.error', function (err) {
__logger.warn(err) // Here logs the error every 10~15 minutes
})
The kafka cluster info:
$ ./bin/kafka-topics.sh --describe --zookeeper localhost:12181 --topic userlog
Topic:userlog PartitionCount:4 ReplicationFactor:2 Configs:
Topic: userlog Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: userlog Partition: 1 Leader: 2 Replicas: 2,1 Isr: 1,2
Topic: userlog Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: userlog Partition: 3 Leader: 2 Replicas: 2,1 Isr: 1,2
Did I miss some config options?
Thanks.
This is generally indicative of some network issues, or broker problems. Is it on a fixed interval? What version of Kafka are you using for your brokers?
@kidlj I've seen that as well due to the broker reaping idle connections. Would be nice if we could see the underlying reason for the disconnect but I think this library & librdkafka are simply relaying what the broker is telling them. Maybe try log.connection.close=false if too noisy.
@webmakersteve I'm using kafka_2.10-0.10.0.0 and yes, it is on a fixed interval, about 10~15 minutes once.
@erik-stephens Thanks for your explanation, sir.
Yep, 10 minute interval, pretty sure this is just dubious logging when the idle reaper comes calling. It's pretty annoying that it ends up repoting it multiple times, but I think I'll rather write a debouncer in our wrapper rather than ignore all connection close messages 馃
Going to close this issue. I believe it is pretty safe to ignore the connection close messages the vast majority of the time. They will still manifest as broker errors when transactions irreparably fail, but librdkafka deals with reconnection internally.
Most helpful comment
@kidlj I've seen that as well due to the broker reaping idle connections. Would be nice if we could see the underlying reason for the disconnect but I think this library & librdkafka are simply relaying what the broker is telling them. Maybe try
log.connection.close=falseif too noisy.