Hi,
Why ConsumerGroupStream stops fetch data after some period of time or batch of messages ?
I'm using ConsumerGroupStream. Trying to read data using pipe. But when a process starts fetch data from topic, after fetched some amount of data it's going to pause and no one message is consumed any more...
function create_consumer(host, topic){
let client = new kafka.Client(host);
let payload = [{ topic: topic, partition: 1}];
let options = {
kafkaHost: host,
groupId: 'test_x',
//ssl: true,
sessionTimeout: 30000,
autoCommitIntervalMs: 100,
protocol: ['roundrobin'],
autoCommit: false,
fetchMaxBytes: 1024 * 1024,
fromOffset: 'earliest',
outOfRangeOffset: 'earliest'
};
return new kafka.ConsumerGroupStream(options, topic);
}
let sm_consumer = create_consumer('host_name:9092', 'topic_messages');
sm_consumer.pipe(
through2.obj(function (data, enc, cb) {
console.log(data);
sm_consumer.commit(data);
cb(null, data);
})
);
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: host_name-01.v2 port: 9
092 +4ms
kafka-node:ConsumerGroupStream _read called +17ms
kafka-node:ConsumerGroupStream consumerGroup is not ready, calling consumerGro
up.connect +0ms
kafka-node:ConsumerGroup Connecting kafka-node-client +1ms
kafka-node:KafkaClient Sending versions request to host_name-01.v2:9092 +
100ms
kafka-node:KafkaClient broker socket connected {"host":"host_name-01.v2",
"port":"9092"} +3ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'host
name-04.v2',
coordinatorPort: 9092,
coordinatorId: 4 } +96ms
kafka-node:KafkaClient found 1 connected broker(s) +4ms
kafka-node:ConsumerGroupRecovery tryToRecoverFrom connect { BrokerNotAvailable
Error
at new BrokerNotAvailableError (C:\some_link\some_dir\node_modules\kaf
ka-node\lib\errorsBrokerNotAvailableError.js:11:9)
at KafkaClient.Client.sendGroupRequest (C:\some_link\some_dir\node_mod
ules\kafka-node\lib\client.js:296:15)
at KafkaClient.Client.sendJoinGroupRequest (C:\some_link\some_dir\node
_modules\kafka-node\lib\client.js:308:8)
at C:\some_link\some_dir\node_modules\kafka-node\lib\consumerGroup.js:
453:21
at nextTask (C:\some_link\some_dir\node_modules\async\dist\async.js:53
10:14)
at next (C:\some_link\some_dir\node_modules\async\dist\async.js:5317:9
)
at C:\some_link\some_dir\node_modules\async\dist\async.js:958:16
at KafkaClient.wrappedFn (C:\some_link\some_dir\node_modules\kafka-nod
e\lib\kafkaClient.js:374:14)
at KafkaClient.Client.handleReceivedData (C:\some_link\some_dir\node_m
odules\kafka-node\lib\client.js:764:60)
at Socket.
ib\kafkaClient.js:523:10) message: 'Broker not available' } +2ms
kafka-node:ConsumerGroupRecovery RECOVERY from connect: kafka-node-client retr
ying in 1000 ms { BrokerNotAvailableError
at new BrokerNotAvailableError (C:\some_link\some_dir\node_modules\kaf
ka-node\lib\errorsBrokerNotAvailableError.js:11:9)
at KafkaClient.Client.sendGroupRequest (C:\some_link\some_dir\node_mod
ules\kafka-node\lib\client.js:296:15)
at KafkaClient.Client.sendJoinGroupRequest (C:\some_link\some_dir\node
_modules\kafka-node\lib\client.js:308:8)
at C:\some_link\some_dir\node_modules\kafka-node\lib\consumerGroup.js:
453:21
at nextTask (C:\some_link\some_dir\node_modules\async\dist\async.js:53
10:14)
at next (C:\some_link\some_dir\node_modules\async\dist\async.js:5317:9
)
at C:\some_link\some_dir\node_modules\async\dist\async.js:958:16
at KafkaClient.wrappedFn (C:\some_link\some_dir\node_modules\kafka-nod
e\lib\kafkaClient.js:374:14)
at KafkaClient.Client.handleReceivedData (C:\some_link\some_dir\node_m
odules\kafka-node\lib\client.js:764:60)
at Socket.
ib\kafkaClient.js:523:10) message: 'Broker not available' } +3ms
kafka-node:KafkaClient Received versions response from host_name-01.v2:90
92 +81ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usa
ble":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":
0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":fa
lse},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"m
ax":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"off
setCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":
1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,
"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max"
:0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":
0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHan
dshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable"
:0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max
":0,"usable":false}} +2ms
kafka-node:KafkaClient updating metadatas +209ms
kafka-node:KafkaClient updating metadatas +115ms
kafka-node:ConsumerGroup Connecting kafka-node-client +595ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'host
name-04.v2',
coordinatorPort: 9092,
coordinatorId: 4 } +91ms
kafka-node:KafkaClient Sending versions request to host_name-04.v2:9092 +
95ms
kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["topic
name"],"version":0,"id":"kafka-node-client-a3898abe-a764-42fe-a7df-
41d123asdad"}],"generationId":16,"groupProtocol":"roundrobin","leaderId":"kafka
-node-client-b389asdas-a764-42fe-a7df-asd23413asd","memberId":"kafka-node-client
-b389asdas-a764-42fe-a7df-asd23413asd"} from kafka-node-client +93ms
kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ '
topic_name' ],
version: 0,
userData: undefined,
id: 'kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd' } ] +1ms
kafka-node:ConsumerGroup Using group protocol roundrobin +2ms
kafka-node:ConsumerGroup loadingMetadata for topics: [ 'topic_name
' ] +3ms
kafka-node:KafkaClient Received versions response from host_name-04.v2:90
92 +81ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usa
ble":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":
0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":fa
lse},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"m
ax":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"off
setCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":
1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,
"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max"
:0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":
0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHan
dshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable"
:0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max
":0,"usable":false}} +0ms
kafka-node:ConsumerGroup mapTopicToPartitions { topic_name: [ '0',
'1' ] } +11ms
kafka-node:Roundrobin topicPartition: {"topic_name":["0","1"]} +1m
s
kafka-node:Roundrobin groupMembers: [{"subscription":["topic_name"
],"version":0,"id":"kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd"}] +1
ms
kafka-node:Roundrobin members [ 'kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd
123asdad' ] +1ms
kafka-node:Roundrobin subscribers { 'kafka-node-client-a3898abe-a764-42fe-a7df
-41d123asdad': [ 'topic_name' ] } +1ms
kafka-node:Roundrobin round robin on topic partition pairs: [ { topic: 'topic
name', partition: '0' },
{ topic: 'topic_name', partition: '1' } ] +1ms
kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-a3898abe-a76
4-42fe-a7df-41d123asdad +2ms
kafka-node:ConsumerGroup SyncGroup Response +98ms
kafka-node:ConsumerGroup kafka-node-client owns topics: { topicpn_messa
ges: [ 0, 1 ] } +1ms
kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"topic
name":{"0":8326,"1":8387}} +98ms
kafka-node:ConsumerGroup Has saved offsets +0ms
kafka-node:ConsumerGroup generationId 16 +2ms
kafka-node:ConsumerGroup startFetch is true +0ms
kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 m
s +6ms
Thanks for your contribution!
Your through2 should pipe the data to a writer stream or attach an eventlistener for data as shown in the example https://www.npmjs.com/package/through2 . By default nodejs streams operating in Object mode can keep 16 objects (highWaterMark) in buffer and beyond that point back pressure will be applied which in effect pauses your ConsumerGroup stream.
sm_consumer.pipe(
through2.obj(function (data, enc, cb) {
console.log(data);
sm_consumer.commit(data);
cb(null, data);
})
);
@vigneshnrfs thank you !
Most helpful comment
Your through2 should pipe the data to a writer stream or attach an eventlistener for
dataas shown in the example https://www.npmjs.com/package/through2 . By default nodejs streams operating in Object mode can keep 16 objects (highWaterMark) in buffer and beyond that point back pressure will be applied which in effect pauses your ConsumerGroup stream.