Every time I start a ConsumerGroupStream with the same hard coded consumer id and group id it consumes messages from the earliest offset rather than the latest even when fromOffset is set to latest. It seems like the offset is set to 1 on consumer group stream.
const opts = {
id: 'consumer1',
kafkaHost: 'kafka01:9092',
groupId: 'consumer-group-1',
autoCommit: true,
protocol: ['roundrobin'],
fromOffset: 'latest',
autoCommitIntervalMs: 1000,
fetchMaxBytes: 1024 * 1024,
fetchMaxWaitMs: 100,
encoding: 'utf8',
fetchMinBytes: 1,
asyncPush: false,
}
const topicsToConsume = typeof topics === 'string' ? topics.split(',') : topics
const cGroupStream = new kafka.ConsumerGroupStream(opts, topicsToConsume)
const messageTransform = new Transform({
objectMode: true,
decodeStrings: true,
transform (message, encoding, done) {
console.log('message', message)
return done()
}
})
cGroupStream.pipe(messageTransform).on('error', errorHandler)
# whenever a new message is produced
kafka-node:ConsumerGroupStream _read called +103ms
kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
kafka-node:KafkaClient found leaders for all +1ms
kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
kafka-node:KafkaClient has apiSupport broker is ready +0ms
kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
kafka-node:KafkaClient found leaders for all +0ms
kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
kafka-node:KafkaClient has apiSupport broker is ready +0ms
kafka-node:ConsumerGroupStream skipping committing +2ms
# when the service is killed
kafka-node:KafkaClient longpolling socket [BrokerWrapper 172.23.0.4:9092 (connected: true) (ready: true) (idle: false)] is waiting +0ms
^C kafka-node:ConsumerGroupStream committing [ { topic: 'contacts.mutate',
partition: 0,
offset: 1,
metadata: 'm' } ] +6ms
kafka-node:ConsumerGroup data-subject-enhancer leaving group +7ms
kafka-node:KafkaClient close client +6ms
# kafka diagnostics
./kafka-consumer-groups.sh --describe --bootstrap-server kafka:9092 --group consumer-group-1
Note: This will only show information about consumers that use the Java consumer API (non- ZooKeeper-based consumers).
Consumer group 'data_subject_enhancer_v1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
companies.mutate 0 0 0 0 - - -
contacts.mutate 0 1 9 8 - - -
fullcontact.webhook 0 0 0 0 - - -
Same issue here. Anyone know how to resolve?
@freddyfallon I never figured out a solution using the ConsumerGroupStream so I switched back to the non-streamable ConsumerGroup :/
fromOffset only applies to groups w/o previous commits. Is this a new group without any previous commits?
After some exploration, I have noticed that for me ConsumerGroupStreams are not auto-committing when I am moving to the next chunk in the stream. The only time it auto-commits is when I actually close my ConsumerGroupStream.
I have noticed that for me ConsumerGroupStreams are not auto-committing when I am moving to the next chunk in the stream
Commits should be throttled by autoCommitIntervalMs and not exactly after data event is emitted.
Do you have example code that reproduces the issue?
Hey @hyperlink I have found the issue. It was actually solved in this PR: https://github.com/SOHU-Co/kafka-node/pull/1066
Any idea when the next release will happen?
P. S. appreciate the help.
Published as 3.0.1
Perfect thank you.
FYI you also need my PR before you can reliably use ConsumerGroup(Stream): https://github.com/SOHU-Co/kafka-node/pull/1072
Hopefully will get pulled and published soon.
But sounds like the root issue for this ticket is resolved though, but note you will receive duplicate messages until my PR lands.
Most helpful comment
Published as 3.0.1