Kafka-node: ConsumerGroupStream is going to pause after 15 fetched messages

Created on 17 Nov 2017  路  2Comments  路  Source: SOHU-Co/kafka-node

Questions?

Hi,
Why ConsumerGroupStream stops fetch data after some period of time or batch of messages ?

Bug Report

I'm using ConsumerGroupStream. Trying to read data using pipe. But when a process starts fetch data from topic, after fetched some amount of data it's going to pause and no one message is consumed any more...

Environment

  • Node version: v7.8.0
  • Kafka-node version: v2.2.3
  • Kafka version: ~v10

For specific cases also provide

  • Number of Brokers: 6
  • Number partitions for topic: [0,1]

Include Sample Code to reproduce behavior

function create_consumer(host, topic){
        let client = new kafka.Client(host);
        let payload = [{ topic: topic, partition: 1}];

        let options = {
            kafkaHost: host,
            groupId: 'test_x',
            //ssl: true,
            sessionTimeout: 30000,
            autoCommitIntervalMs: 100,
            protocol: ['roundrobin'],
            autoCommit: false,
            fetchMaxBytes: 1024 * 1024,
            fromOffset: 'earliest',
            outOfRangeOffset: 'earliest'
        };
        return new kafka.ConsumerGroupStream(options, topic);

    }
let sm_consumer = create_consumer('host_name:9092', 'topic_messages');
sm_consumer.pipe(
        through2.obj(function (data, enc, cb) {
                                     console.log(data);
                                     sm_consumer.commit(data);
                        cb(null, data);
           })
);

Include output with Debug turned on

kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: host_name-01.v2 port: 9
092 +4ms
kafka-node:ConsumerGroupStream _read called +17ms
kafka-node:ConsumerGroupStream consumerGroup is not ready, calling consumerGro
up.connect +0ms
kafka-node:ConsumerGroup Connecting kafka-node-client +1ms
kafka-node:KafkaClient Sending versions request to host_name-01.v2:9092 +
100ms
kafka-node:KafkaClient broker socket connected {"host":"host_name-01.v2",
"port":"9092"} +3ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'host
name-04.v2',
coordinatorPort: 9092,
coordinatorId: 4 } +96ms
kafka-node:KafkaClient found 1 connected broker(s) +4ms
kafka-node:ConsumerGroupRecovery tryToRecoverFrom connect { BrokerNotAvailable
Error
at new BrokerNotAvailableError (C:\some_link\some_dir\node_modules\kaf
ka-node\lib\errorsBrokerNotAvailableError.js:11:9)
at KafkaClient.Client.sendGroupRequest (C:\some_link\some_dir\node_mod
ules\kafka-node\lib\client.js:296:15)
at KafkaClient.Client.sendJoinGroupRequest (C:\some_link\some_dir\node
_modules\kafka-node\lib\client.js:308:8)
at C:\some_link\some_dir\node_modules\kafka-node\lib\consumerGroup.js:
453:21
at nextTask (C:\some_link\some_dir\node_modules\async\dist\async.js:53
10:14)
at next (C:\some_link\some_dir\node_modules\async\dist\async.js:5317:9
)
at C:\some_link\some_dir\node_modules\async\dist\async.js:958:16
at KafkaClient.wrappedFn (C:\some_link\some_dir\node_modules\kafka-nod
e\lib\kafkaClient.js:374:14)
at KafkaClient.Client.handleReceivedData (C:\some_link\some_dir\node_m
odules\kafka-node\lib\client.js:764:60)
at Socket. (C:\some_link\some_dir\node_modules\kafka-node\l
ib\kafkaClient.js:523:10) message: 'Broker not available' } +2ms
kafka-node:ConsumerGroupRecovery RECOVERY from connect: kafka-node-client retr
ying in 1000 ms { BrokerNotAvailableError
at new BrokerNotAvailableError (C:\some_link\some_dir\node_modules\kaf
ka-node\lib\errorsBrokerNotAvailableError.js:11:9)
at KafkaClient.Client.sendGroupRequest (C:\some_link\some_dir\node_mod
ules\kafka-node\lib\client.js:296:15)
at KafkaClient.Client.sendJoinGroupRequest (C:\some_link\some_dir\node
_modules\kafka-node\lib\client.js:308:8)
at C:\some_link\some_dir\node_modules\kafka-node\lib\consumerGroup.js:
453:21
at nextTask (C:\some_link\some_dir\node_modules\async\dist\async.js:53
10:14)
at next (C:\some_link\some_dir\node_modules\async\dist\async.js:5317:9
)
at C:\some_link\some_dir\node_modules\async\dist\async.js:958:16
at KafkaClient.wrappedFn (C:\some_link\some_dir\node_modules\kafka-nod
e\lib\kafkaClient.js:374:14)
at KafkaClient.Client.handleReceivedData (C:\some_link\some_dir\node_m
odules\kafka-node\lib\client.js:764:60)
at Socket. (C:\some_link\some_dir\node_modules\kafka-node\l
ib\kafkaClient.js:523:10) message: 'Broker not available' } +3ms
kafka-node:KafkaClient Received versions response from host_name-01.v2:90
92 +81ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usa
ble":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":
0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":fa
lse},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"m
ax":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"off
setCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":
1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,
"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max"
:0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":
0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHan
dshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable"
:0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max
":0,"usable":false}} +2ms
kafka-node:KafkaClient updating metadatas +209ms
kafka-node:KafkaClient updating metadatas +115ms
kafka-node:ConsumerGroup Connecting kafka-node-client +595ms
kafka-node:ConsumerGroup GroupCoordinator Response: { coordinatorHost: 'host
name-04.v2',
coordinatorPort: 9092,
coordinatorId: 4 } +91ms
kafka-node:KafkaClient Sending versions request to host_name-04.v2:9092 +
95ms
kafka-node:ConsumerGroup joinGroupResponse {"members":[{"subscription":["topic
name"],"version":0,"id":"kafka-node-client-a3898abe-a764-42fe-a7df-
41d123asdad"}],"generationId":16,"groupProtocol":"roundrobin","leaderId":"kafka
-node-client-b389asdas-a764-42fe-a7df-asd23413asd","memberId":"kafka-node-client
-b389asdas-a764-42fe-a7df-asd23413asd"} from kafka-node-client +93ms
kafka-node:ConsumerGroup Assigning Partitions to members [ { subscription: [ '
topic_name' ],
version: 0,
userData: undefined,
id: 'kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd' } ] +1ms
kafka-node:ConsumerGroup Using group protocol roundrobin +2ms
kafka-node:ConsumerGroup loadingMetadata for topics: [ 'topic_name
' ] +3ms
kafka-node:KafkaClient Received versions response from host_name-04.v2:90
92 +81ms
kafka-node:KafkaClient setting api support to {"produce":{"min":0,"max":2,"usa
ble":2},"fetch":{"min":0,"max":3,"usable":0},"offset":{"min":0,"max":1,"usable":
0},"metadata":{"min":0,"max":2,"usable":0},"leader":{"min":0,"max":0,"usable":fa
lse},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"m
ax":3,"usable":false},"controlledShutdown":{"min":1,"max":1,"usable":false},"off
setCommit":{"min":0,"max":2,"usable":2},"offsetFetch":{"min":0,"max":2,"usable":
1},"groupCoordinator":{"min":0,"max":0,"usable":0},"joinGroup":{"min":0,"max":1,
"usable":0},"heartbeat":{"min":0,"max":0,"usable":0},"leaveGroup":{"min":0,"max"
:0,"usable":0},"syncGroup":{"min":0,"max":0,"usable":0},"describeGroups":{"min":
0,"max":0,"usable":false},"listGroups":{"min":0,"max":0,"usable":false},"saslHan
dshake":{"min":0,"max":0,"usable":false},"apiVersions":{"min":0,"max":0,"usable"
:0},"createTopics":{"min":0,"max":1,"usable":false},"deleteTopics":{"min":0,"max
":0,"usable":false}} +0ms
kafka-node:ConsumerGroup mapTopicToPartitions { topic_name: [ '0',
'1' ] } +11ms
kafka-node:Roundrobin topicPartition: {"topic_name":["0","1"]} +1m
s
kafka-node:Roundrobin groupMembers: [{"subscription":["topic_name"
],"version":0,"id":"kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd"}] +1
ms
kafka-node:Roundrobin members [ 'kafka-node-client-b389asdas-a764-42fe-a7df-asd23413asd
123asdad' ] +1ms
kafka-node:Roundrobin subscribers { 'kafka-node-client-a3898abe-a764-42fe-a7df
-41d123asdad': [ 'topic_name' ] } +1ms
kafka-node:Roundrobin round robin on topic partition pairs: [ { topic: 'topic
name', partition: '0' },
{ topic: 'topic_name', partition: '1' } ] +1ms
kafka-node:ConsumerGroup SyncGroup Request from kafka-node-client-a3898abe-a76
4-42fe-a7df-41d123asdad +2ms
kafka-node:ConsumerGroup SyncGroup Response +98ms
kafka-node:ConsumerGroup kafka-node-client owns topics: { topicpn_messa
ges: [ 0, 1 ] } +1ms
kafka-node:ConsumerGroup kafka-node-client fetchOffset Response: {"topic
name":{"0":8326,"1":8387}} +98ms
kafka-node:ConsumerGroup Has saved offsets +0ms
kafka-node:ConsumerGroup generationId 16 +2ms
kafka-node:ConsumerGroup startFetch is true +0ms
kafka-node:ConsumerGroup kafka-node-client started heartbeats at every 10000 m
s +6ms

Thanks for your contribution!

Most helpful comment

Your through2 should pipe the data to a writer stream or attach an eventlistener for data as shown in the example https://www.npmjs.com/package/through2 . By default nodejs streams operating in Object mode can keep 16 objects (highWaterMark) in buffer and beyond that point back pressure will be applied which in effect pauses your ConsumerGroup stream.

sm_consumer.pipe(
        through2.obj(function (data, enc, cb) {
                                     console.log(data);
                                     sm_consumer.commit(data);
                        cb(null, data);
           })
);

All 2 comments

Your through2 should pipe the data to a writer stream or attach an eventlistener for data as shown in the example https://www.npmjs.com/package/through2 . By default nodejs streams operating in Object mode can keep 16 objects (highWaterMark) in buffer and beyond that point back pressure will be applied which in effect pauses your ConsumerGroup stream.

sm_consumer.pipe(
        through2.obj(function (data, enc, cb) {
                                     console.log(data);
                                     sm_consumer.commit(data);
                        cb(null, data);
           })
);

@vigneshnrfs thank you !

Was this page helpful?
0 / 5 - 0 ratings

Related issues

chetandev picture chetandev  路  5Comments

twawszczak picture twawszczak  路  6Comments

comrat picture comrat  路  5Comments

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

sergeyjsg picture sergeyjsg  路  4Comments