Node-rdkafka: Cannot consume from 'earliest' offset.

Created on 9 Jan 2018  Â·  13Comments  Â·  Source: Blizzard/node-rdkafka

Hello!

I've created a very basic consumer and I want to consume from the beginning of the topic every time it starts, however, it doesn't seem to be working - always consuming from latest. I know there are a number of messages on the topic and this seems to be the case even with brand a new group.id.

This is my consumer code:

let brokers = 'localhost:12345';
let topic = 'test';
const consumer = new Kafka.KafkaConsumer({
    'group.id': 'my-app-name',
    'metadata.broker.list': brokers,
    'enable.auto.commit': false, // don't commit my offset
    'auto.offset.reset': 'earliest' // consume from the start
}, {});

consumer.connect();

consumer
  .on('ready', function() {
    console.log('connected');
    consumer.subscribe([topic]);
    consumer.consume();
  })
  .on('data', function(data) {
    // Output the actual message contents
    console.log(data.value.toString());
  });

I thought I was using the correct configuration as per the librdkafka config README, but I may be wrong.

TIA!

EDIT: Using version 2.2.1 on node 8.9.4

Most helpful comment

Oh my goodness. I've been a complete idiot.

auto.offset.reset is a _TOPIC_ config, not consumer config.

So when creating an instance of the KafkaConsumer, auto.offset.reset should be passed in the _second_ set of arguments.

So my code in the original would looks like this:

let brokers = 'localhost:12345';
let topic = 'test';
const consumer = new Kafka.KafkaConsumer({
    'group.id': 'my-app-name',
    'metadata.broker.list': brokers,
    'enable.auto.commit': false, // don't commit my offset
}, {
    'auto.offset.reset': 'earliest' // consume from the start
});

consumer.connect();

consumer
  .on('ready', function() {
    console.log('connected');
    consumer.subscribe([topic]);
    consumer.consume();
  })
  .on('data', function(data) {
    // Output the actual message contents
    console.log(data.value.toString());
  });

My sincerest of apologies.

All 13 comments

Can you try using kafkacat to consume off the same broker from the beginning and ensure that the messages you expect are there? What you have there should definitely work, and I do it all the time :P

Ensure you're trying with a group id in case a commit came through. Also it may help to check the broker status for the group you're using to make sure it has not committed somehow.

Hey @webmakersteve, thanks for the response.

I'm running this against a local cluster using https://github.com/wurstmeister/kafka-docker.

I'm still seeing the same issue and I've added a Date.now() to the setting of group.id to ensure it's unique every time it starts. I have had this working previously using the same consumer options.

I've run the app app twice, the first without any messages on the topic, producing using the console producer (the numbers you can see). The second run was to see if the consumer started from the beginning:

[root@c541a7da2027 app]# BROKERS='$IP:32776,$IP:32775,$IP:32774' TOPIC='Topic1' node extract.js
connected
1
2
3
4
5
6
^C
[root@c541a7da2027 app]# BROKERS='$IP:32776,$IP:32775,$IP:32774' TOPIC='Topic1' node extract.js
connected
^C
[root@c541a7da2027 app]#

I've checked the offsets for the group (while connected) and it's definitely not committing:

578h-4.3# ./kafka-consumer-groups.sh --bootstrap-server=`broker-list.sh` --describe --group my-app-name-15156158955

Note: This will not show information about old Zookeeper-based consumers.


TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID   HOST   CLIENT-ID
bash-4.3#

I've ran kafkacat and found that the messages are indeed there:

root@73906821f20f:/# kafkacat -b '$IP:32776,$IP:32775,$IP:32774' -C -t Topic1 -o beginning
1
2
3
4
5
6
% Reached end of topic Topic1 [0] at offset 6

I wonder if it could be down to how I'm compiling node-rdkafka? The below is my Dockerfile:

FROM centos:7

RUN curl --silent --location https://rpm.nodesource.com/setup_8.x | bash - && \
    yum install -y epel-release && \
    yum update -y && yum install -y \
    gcc-c++ make openssl-devel libsasl2-devel nodejs lz4 lz4-devel

COPY ./src /app

WORKDIR /app

RUN WITH_SASL=0 npm install

(all that's in /app is the script above.)

Let me know if you need anything else from me!

Thanks again.

Can confirm it's not just inside the docker container, but also on my local machine too (rm -rf'd my node_modules and reinstalled locally):

➜  src git:(master) ✗ BROKERS='$IP:32776,$IP:32775,$IP:32774' TOPIC='Topic1' node extract.js
connected
^C
➜  src git:(master) ✗

Edit: Local node version is 7.9.0

For what it's worth, I am seeing the same issue.

Edit:
When I run kafkacat -b 'broker:9092' -G group topic I don't get any old messages. I only get messages created after running the command.
I assume it's using librdkafka behind the scenes, so it should be the same problem we're seeing here, right?
Also replacing group with another group id, still gives me no old messages. I also tried using -o beginning, still no messages.

I am however getting the messages if I do kafkacat -b 'broker:9092' -C -t topic -o beginning.

Have you enabled debug output? I suspect you'll see an explanation of why it's not fetching with options debug=all and event_cb=true coupled with an event.log handler.

Hi @erik-stephens,

I have done as requested. Initially, I set debug to all and that was very noisy. I slimmed it down to just cgrp and saw this come out in the console:

{ severity: 7,
  fac: 'OFFSET',
  message: '[thrd:main]: Topic Topic1 [0]: setting default offset INVALID' }

I thought that to be particularly interesting.

I don't think that's an error msg [1][2]. Does adding fetch to debug show anything interesting? It might help to compare kafkacat's debug output (-d) with your program's output to help see why it's behaving so differently.

I've been looking through the sourcecode for kafkacat and I can't seem to find any mention of it using the -o beginning option with the -G (kafkaconsumer_run) option.
It only seems to be used when using the -C (consumer_run) option.

I'm also seeing quite a few of the offset INVALID, if that helps.

Oh my goodness. I've been a complete idiot.

auto.offset.reset is a _TOPIC_ config, not consumer config.

So when creating an instance of the KafkaConsumer, auto.offset.reset should be passed in the _second_ set of arguments.

So my code in the original would looks like this:

let brokers = 'localhost:12345';
let topic = 'test';
const consumer = new Kafka.KafkaConsumer({
    'group.id': 'my-app-name',
    'metadata.broker.list': brokers,
    'enable.auto.commit': false, // don't commit my offset
}, {
    'auto.offset.reset': 'earliest' // consume from the start
});

consumer.connect();

consumer
  .on('ready', function() {
    console.log('connected');
    consumer.subscribe([topic]);
    consumer.consume();
  })
  .on('data', function(data) {
    // Output the actual message contents
    console.log(data.value.toString());
  });

My sincerest of apologies.

I've completely missed that as well. Thanks @ls12styler for that! I was going crazy testing back and forth :D

No worries. Sorry for being a n00b :/

Jezzz, same story has happened to me.

And me. Thanks for being the newb who then posted the fix @ls12styler

Was this page helpful?
0 / 5 - 0 ratings

Related issues

codeburke picture codeburke  Â·  3Comments

ighack picture ighack  Â·  5Comments

clChenLiang picture clChenLiang  Â·  3Comments

idangozlan picture idangozlan  Â·  3Comments

corybill picture corybill  Â·  6Comments