Kafka-node: ConsumerGroup does not receive messages in 2.6.1

Created on 12 Jul 2018  路  12Comments  路  Source: SOHU-Co/kafka-node

Bug Report

When using kafka-node version 2.6.1, an instance of the ConsumerGroup successfully emits the 'connect' event; however, consumer.on('message',... does not read any messages from the topic.

When npm install is forced to downgrade to 2.4.1, identical code behaves as expected, and runs the callback for every message received.

Environment

  • Node version: 9.6.1
  • Kafka-node version: 2.6.1
  • Kafka version: 0.11.0.2

Include Sample Code to reproduce behavior

Broken package.json:

{
  "name": "broken-kafka-node",
  "version": "0.0.1",
  "main": "./index.js",
  "dependencies": {
    "express": "^4.16.3",
    "kafka-node": "~2.6.1"
  }
}

Successful package.json:

{
  "name": "broken-kafka-node",
  "version": "0.0.1",
  "main": "./index.js",
  "dependencies": {
    "express": "^4.16.3",
    "kafka-node": "~2.4.1"
  }
}

index.js used in conjunction with both package.json's above:

const kafka = require('kafka-node');

const { ConsumerGroup } = kafka;

const KAFKA_BROKER = "YOUR_BROKER"
const KAFKA_TOPIC = 'YOUR_TEST_TOPIC';
const KAFKA_WATCHER_GROUP = "UNIQUE_WATCHER_GROUP";

console.log(KAFKA_WATCHER_GROUP);
console.log(KAFKA_TOPIC);
console.log(KAFKA_BROKER);

const options = {
  kafkaHost: KAFKA_BROKER,
  groupId: KAFKA_WATCHER_GROUP,
  sessionTimeout: 6000,
  protocol: ['roundrobin'],
  fromOffset: 'latest',
  requestTimeout: false,
};

const consumer = new ConsumerGroup(options, [KAFKA_TOPIC]);

consumer.on('message', async (message) => {
  console.log('message received!');
});

consumer.on('connect', () => {
  console.log('connected');
});

const express = require('express');

const app = express();
app.listen(4567, () => {
  console.log('Listening on port 4567');
});

All 12 comments

Try to confighost instead of kafkaHost in options:

host: this.sysConfig.ZOOKEEPER_HOST,

@jnotnull that is not valid. When using KafkaClient/ConsumerGroup, you only worry about kafka host. You don't ever directly connect to Zookeeper.

@NealHumphrey can you run it with DEBUG="kafka-node:*" node index.js

@aikar Use host config connect to zookeeper can avoid his problem.

@jnotnull no, that's not how you fix things.

A modern kafka stack doesn't connect your application to Zookeeper for CG at all. It works fine for everyone else, so that's not the correct solution.

A consumer group that stores consumer offsets in kafka (not zookeeper) doesn't make sense to connect to zookeeper.

It would probably break things too, as I doubt ZK knows how to handle consumer offset commits into __consumer_offsets?

Just for reference, trying it with host also did not work.

Were you able to reproduce the problem locally with my demo code?

@NealHumphrey. You may want fromOffset: "earliest" if you're trying to consume messages from a topic that no offset have been committed and also no new message will arrive.
I'm new to kafka, and struggle with ConsumerGroup for a long time and finally found that fromOffset setting affects the behaviour how ConsumerGroup deal with a topic with not offset commited yet(offset=-1). And also to my suprise that kafka-node defaults to 'latest' than 'earliest'.
See also: https://stackoverflow.com/questions/48320672/what-is-the-difference-between-kafka-earliest-and-latest-offset-values

@thynson unfortunately that wouldn't solve it for me - I tested it when there were definitely new messages coming into the topic (I created them after turning on the consumer), latest would get those new messages. I consumed other messages on the topic from another consumer (e.g. the 2.4.1 version) so there would have been other offsets created in that consumerGroup.

Let's be clear theres no magic solution here of 'use zookeeper instead of kafka'.

@thynson's idea was logical if this was a new consumer group, but based on @NealHumphrey's response, its not.

It's likely due to https://github.com/SOHU-Co/kafka-node/commit/8a831e3ed786b79c2b37a47bfb568cd131a4d10e, maybe broke something for your specific kafka version.

Though all the unit tests pass...

I will say it's working for me on Kafka 1.1.0 if you want to try updating.

@aikar thanks for the update. I can't update the Kafka version as it's an org-wide managed installation that I'm not in control of.

For now I'm just pegging to 2.4.1 of this library and will try to upgrade when we next update our org kafka version.

@NealHumphrey can you try running it with the debug prefix i mentioned earlier, and scrub any sensitive info from the logs and show them?

that will indicate where its stuck.

@NealHumphrey does this issue also happen in 2.5.0? I wonder if this is related to the fetch versions added in 2.6.0

Likely related to what I fixed here too https://github.com/SOHU-Co/kafka-node/pull/1072

Was this page helpful?
0 / 5 - 0 ratings