Kafka-node: Code not working after 0.5.x upgrade

Created on 2 Sep 2016  路  8Comments  路  Source: SOHU-Co/kafka-node

Starting at 0.5.0, the following test code stopped working. It works in 0.3.2 and 0.4.0.

var kafka = require('kafka-node'),
    HighLevelConsumer = kafka.HighLevelConsumer,
    client = new kafka.Client('localdocker:2181', 'kafkamanualcursor2'),
    consumer = new HighLevelConsumer(
        client,
        [
          {topic: 'XXXX', offset: 0}
        ],
        {
          groupId: 'kafkamanualcursor2',
          autoCommit: false,
          fromOffset: true
        }
    );
setTimeout(function() {
  consumer.setOffset('XXXX', 1, 121);
}, 500);
consumer.on('message', function(msg) {
  console.log(msg);
});
consumer.on('error', function(err) {
  console.log(err);
});
consumer.on('offsetOutOfRange', function(err) {
  console.log(err);
});

The topic looks like this:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localdocker:9092 --topic XXXX --time -1 --offsets 1
XXXX:0:121
XXXX:1:139
XXXX:2:170
XXXX:3:121

When working (0.4.0 and below), onMessage is called with some kafka messages. When it doesn't work (0.5.x), the following is the output with DEBUG=*:

kafka-node:zookeeper Node: /consumers/kafkamanualcursor2/ids/kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 was created. +0ms
  kafka-node:HighLevelConsumer Deregistered listeners +1ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 is attempting to rebalance +2ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 stopping data read during rebalance +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 assembling data for rebalance +1ms
  kafka-node:zookeeper Children are: ["kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44"]. +1ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 releasing current partitions during rebalance +2ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 determining the partitions to own during rebalance +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 gaining ownership of partitions during rebalance +1ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +7ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +7ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +6ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +5ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 rebalance attempt was successful +0ms
  kafka-node:Client refresh metadata currentAttempt +0ms 1
  kafka-node:HighLevelConsumer Registered listeners +7ms
bug

All 8 comments

BTW my kafka is running standalone, version 2.10-0.8.2.1 in this test case.

There's an offset related fix that's currently in master can you point kafka-node to master SOHU-Co/kafka-node and see if the issue is resolved?

Hmm.. With master, the debug output is different (looks like 2 rebalances maybe) but still no messages firing in on('message').

DEBUG=* node app.js
  kafka-node:zookeeper Node: /consumers/kafkamanualcursor2/ids/kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 was created. +0ms
  kafka-node:HighLevelConsumer rebalance() kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is rebalancing: false ready: false +3ms
  kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +1ms
  kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is attempting to rebalance +1ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 stopping data read during rebalance +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 assembling data for rebalance +1ms
  kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 added a pendingRebalances 1 +0ms
  kafka-node:zookeeper Children are: ["kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812"]. +1ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 releasing current partitions during rebalance +2ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 determining the partitions to own during rebalance +0ms
  kafka-node:HighLevelConsumer consumerPerTopicMap.consumerTopicMap {"kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812":["XXXX"]} +0ms
  kafka-node:HighLevelConsumer newTopicPayloads [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 gaining ownership of partitions during rebalance +0ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +4ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 rebalance attempt was successful +0ms
  kafka-node:Client refresh metadata currentAttempt +1ms 1
  kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +6ms
  kafka-node:HighLevelConsumer in fetchOffset kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 payloads: [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
  kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +6ms
  kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 pendingRebalances is 1 scheduling a rebalance... +0ms
  kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 running scheduled rebalance +503ms
  kafka-node:HighLevelConsumer rebalance() kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is rebalancing: false ready: true +0ms
  kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
  kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is attempting to rebalance +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 stopping data read during rebalance +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 assembling data for rebalance +1ms
  kafka-node:zookeeper Children are: ["kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812"]. +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 releasing current partitions during rebalance +2ms
  kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/0 +5ms
  kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/1 +0ms
  kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/2 +1ms
  kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/3 +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 determining the partitions to own during rebalance +1ms
  kafka-node:HighLevelConsumer consumerPerTopicMap.consumerTopicMap {"kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812":["XXXX"]} +0ms
  kafka-node:HighLevelConsumer newTopicPayloads [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 gaining ownership of partitions during rebalance +0ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +5ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +4ms
  kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
  kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 rebalance attempt was successful +0ms
  kafka-node:Client refresh metadata currentAttempt +0ms 1
  kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +2ms
  kafka-node:HighLevelConsumer in fetchOffset kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 payloads: [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
  kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +1ms

Are you running into the same issue with kafka 0.8.2.2 ?

Okay in 0.8.2.2 it seems to deliver all the messages (since offset 0, no matter what offset I specify) but at least it is delivering messages. Maybe I have the syntax wrong for specifying which offset to start from.

Hi, have similar problem with Highlevelconsumer with kafka 0.9.0.1. The problem has gone after upgrade kafka-node library from 0.5.6 to 0.5.7 version.

Hi @rcarmich are you still running into issues with the latest version?

Actually 0.5.7 seems to be working great. Thank you.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

harshitgupta30 picture harshitgupta30  路  4Comments

cesaraugustogarcia picture cesaraugustogarcia  路  3Comments

chetandev picture chetandev  路  5Comments

twawszczak picture twawszczak  路  6Comments

juhanishen picture juhanishen  路  7Comments