Kafka-node: ConsumerGroupStream not respecting latest offset

Created on 1 Jun 2018  路  9Comments  路  Source: SOHU-Co/kafka-node

Bug Report

Every time I start a ConsumerGroupStream with the same hard coded consumer id and group id it consumes messages from the earliest offset rather than the latest even when fromOffset is set to latest. It seems like the offset is set to 1 on consumer group stream.

Environment

  • Node version: v8.3.0
  • Kafka-node version: v2.6.1
  • Kafka version: v0.10.2.1

Include Sample Code to reproduce behavior

const opts = {
  id: 'consumer1',
  kafkaHost: 'kafka01:9092',
  groupId: 'consumer-group-1',
  autoCommit: true,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  autoCommitIntervalMs: 1000,
  fetchMaxBytes: 1024 * 1024,
  fetchMaxWaitMs: 100,
  encoding: 'utf8',
  fetchMinBytes: 1,
  asyncPush: false,
}

const topicsToConsume = typeof topics === 'string' ? topics.split(',') : topics
const cGroupStream = new kafka.ConsumerGroupStream(opts, topicsToConsume)

const messageTransform = new Transform({
  objectMode: true,
  decodeStrings: true,
  transform (message, encoding, done) {
    console.log('message', message)
    return done()
  }
})

cGroupStream.pipe(messageTransform).on('error', errorHandler)

relevant debug logs

  # whenever a new message is produced

  kafka-node:ConsumerGroupStream _read called +103ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +1ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:ConsumerGroupStream skipping committing +2ms


  # when the service is killed

  kafka-node:KafkaClient longpolling socket [BrokerWrapper 172.23.0.4:9092 (connected: true) (ready: true) (idle: false)] is waiting +0ms
^C  kafka-node:ConsumerGroupStream committing [ { topic: 'contacts.mutate',
  partition: 0,
  offset: 1,
  metadata: 'm' } ] +6ms
  kafka-node:ConsumerGroup data-subject-enhancer leaving group +7ms
  kafka-node:KafkaClient close client +6ms


  # kafka diagnostics

  ./kafka-consumer-groups.sh --describe --bootstrap-server kafka:9092 --group consumer-group-1
  Note: This will only show information about consumers that use the Java consumer API (non- ZooKeeper-based consumers).

  Consumer group 'data_subject_enhancer_v1' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
companies.mutate               0          0               0               0          -                                                 -                              -
contacts.mutate                0          1               9               8          -                                                 -                              -
fullcontact.webhook            0          0               0               0          -                                                 -                              -

Most helpful comment

Published as 3.0.1

All 9 comments

Same issue here. Anyone know how to resolve?

@freddyfallon I never figured out a solution using the ConsumerGroupStream so I switched back to the non-streamable ConsumerGroup :/

fromOffset only applies to groups w/o previous commits. Is this a new group without any previous commits?

After some exploration, I have noticed that for me ConsumerGroupStreams are not auto-committing when I am moving to the next chunk in the stream. The only time it auto-commits is when I actually close my ConsumerGroupStream.

I have noticed that for me ConsumerGroupStreams are not auto-committing when I am moving to the next chunk in the stream

Commits should be throttled by autoCommitIntervalMs and not exactly after data event is emitted.

Do you have example code that reproduces the issue?

Hey @hyperlink I have found the issue. It was actually solved in this PR: https://github.com/SOHU-Co/kafka-node/pull/1066

Any idea when the next release will happen?

P. S. appreciate the help.

Published as 3.0.1

Perfect thank you.

FYI you also need my PR before you can reliably use ConsumerGroup(Stream): https://github.com/SOHU-Co/kafka-node/pull/1072

Hopefully will get pulled and published soon.

But sounds like the root issue for this ticket is resolved though, but note you will receive duplicate messages until my PR lands.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ghinks picture ghinks  路  6Comments

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

comrat picture comrat  路  5Comments

mmiller42 picture mmiller42  路  6Comments

kameshwari-suresh picture kameshwari-suresh  路  3Comments