Starting at 0.5.0, the following test code stopped working. It works in 0.3.2 and 0.4.0.
var kafka = require('kafka-node'),
HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client('localdocker:2181', 'kafkamanualcursor2'),
consumer = new HighLevelConsumer(
client,
[
{topic: 'XXXX', offset: 0}
],
{
groupId: 'kafkamanualcursor2',
autoCommit: false,
fromOffset: true
}
);
setTimeout(function() {
consumer.setOffset('XXXX', 1, 121);
}, 500);
consumer.on('message', function(msg) {
console.log(msg);
});
consumer.on('error', function(err) {
console.log(err);
});
consumer.on('offsetOutOfRange', function(err) {
console.log(err);
});
The topic looks like this:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localdocker:9092 --topic XXXX --time -1 --offsets 1
XXXX:0:121
XXXX:1:139
XXXX:2:170
XXXX:3:121
When working (0.4.0 and below), onMessage is called with some kafka messages. When it doesn't work (0.5.x), the following is the output with DEBUG=*:
kafka-node:zookeeper Node: /consumers/kafkamanualcursor2/ids/kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 was created. +0ms
kafka-node:HighLevelConsumer Deregistered listeners +1ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 is attempting to rebalance +2ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 stopping data read during rebalance +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 assembling data for rebalance +1ms
kafka-node:zookeeper Children are: ["kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44"]. +1ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 releasing current partitions during rebalance +2ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 determining the partitions to own during rebalance +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 gaining ownership of partitions during rebalance +1ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +7ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +7ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +6ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44. +5ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_62a915a6-9524-46c3-ba92-4af5f0413c44 rebalance attempt was successful +0ms
kafka-node:Client refresh metadata currentAttempt +0ms 1
kafka-node:HighLevelConsumer Registered listeners +7ms
BTW my kafka is running standalone, version 2.10-0.8.2.1 in this test case.
There's an offset related fix that's currently in master can you point kafka-node to master SOHU-Co/kafka-node and see if the issue is resolved?
Hmm.. With master, the debug output is different (looks like 2 rebalances maybe) but still no messages firing in on('message').
DEBUG=* node app.js
kafka-node:zookeeper Node: /consumers/kafkamanualcursor2/ids/kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 was created. +0ms
kafka-node:HighLevelConsumer rebalance() kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is rebalancing: false ready: false +3ms
kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +1ms
kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is attempting to rebalance +1ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 stopping data read during rebalance +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 assembling data for rebalance +1ms
kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 added a pendingRebalances 1 +0ms
kafka-node:zookeeper Children are: ["kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812"]. +1ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 releasing current partitions during rebalance +2ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 determining the partitions to own during rebalance +0ms
kafka-node:HighLevelConsumer consumerPerTopicMap.consumerTopicMap {"kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812":["XXXX"]} +0ms
kafka-node:HighLevelConsumer newTopicPayloads [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 gaining ownership of partitions during rebalance +0ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +4ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 rebalance attempt was successful +0ms
kafka-node:Client refresh metadata currentAttempt +1ms 1
kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +6ms
kafka-node:HighLevelConsumer in fetchOffset kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 payloads: [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +6ms
kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 pendingRebalances is 1 scheduling a rebalance... +0ms
kafka-node:HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 running scheduled rebalance +503ms
kafka-node:HighLevelConsumer rebalance() kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is rebalancing: false ready: true +0ms
kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 is attempting to rebalance +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 stopping data read during rebalance +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 assembling data for rebalance +1ms
kafka-node:zookeeper Children are: ["kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812"]. +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 releasing current partitions during rebalance +2ms
kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/0 +5ms
kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/1 +0ms
kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/2 +1ms
kafka-node:zookeeper Removed partition ownership /consumers/kafkamanualcursor2/owners/XXXX/3 +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 determining the partitions to own during rebalance +1ms
kafka-node:HighLevelConsumer consumerPerTopicMap.consumerTopicMap {"kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812":["XXXX"]} +0ms
kafka-node:HighLevelConsumer newTopicPayloads [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 gaining ownership of partitions during rebalance +0ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/0 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/1 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +5ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/2 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +4ms
kafka-node:zookeeper Gained ownership of /consumers/kafkamanualcursor2/owners/XXXX/3 by kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812. +6ms
kafka-node:HighLevelConsumer HighLevelConsumer kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 rebalance attempt was successful +0ms
kafka-node:Client refresh metadata currentAttempt +0ms 1
kafka-node:HighLevelConsumer Registered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +2ms
kafka-node:HighLevelConsumer in fetchOffset kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 payloads: [{"topic":"XXXX","partition":"0","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"1","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"2","offset":0,"maxBytes":1048576,"metadata":"m"},{"topic":"XXXX","partition":"3","offset":0,"maxBytes":1048576,"metadata":"m"}] +0ms
kafka-node:HighLevelConsumer Deregistered listeners kafkamanualcursor2_4b95485e-9fe2-4291-83ba-e990e9e51812 +1ms
Are you running into the same issue with kafka 0.8.2.2 ?
Okay in 0.8.2.2 it seems to deliver all the messages (since offset 0, no matter what offset I specify) but at least it is delivering messages. Maybe I have the syntax wrong for specifying which offset to start from.
Hi, have similar problem with Highlevelconsumer with kafka 0.9.0.1. The problem has gone after upgrade kafka-node library from 0.5.6 to 0.5.7 version.
Hi @rcarmich are you still running into issues with the latest version?
Actually 0.5.7 seems to be working great. Thank you.