Hi there,
we are currently having problems running "kafka-node" in our production system.
Info:
Implementation:
Problem:
Questions:
Configuration:
ssl: false,
groupId: "a-group-id",
sessionTimeout: 30000,
protocol: ["roundrobin"],
fromOffset: "earliest",
migrateHLC: false,
migrateRolling: false,
fetchMaxBytes: 1024 * 100,
fetchMinBytes: 1,
fetchMaxWaitMs: 100,
autoCommit: false,
autoCommitIntervalMs: 5000
We have hit some sort of wall here, thanks for any guesses in advance.
Chris
@holgeradam
Hi @krystianity a low fetchMaxBytes could cause a consumer to hang.
Hi @hyperlink thanks for the quick reply, we have tried different settings for "fetchMaxBytes" already, ranging from 5 KB to 10 MB. Fun-fact: reducing the size results in faster message consumption.
I'll work on a test to consume large amount of messages.
@hyperlink okay cool, thanks for looking into this. I will try to check our Kafka Logs in a few minutes, we see a few logs: "Connection Close" related to "broken OffsetCommitRequest" but I have to make sure they are coming from my client.
When using autoCommit: true in this scenario the consumer seems to commit out of order and very high early. Like committing half of the partitions nearly completed and half set to zero while the consumer has received messages evenly distributed over all partitions. Looking forward to a test for that scenario. Thanks!
Okay, so I have already identified a few of these kafka errors are caused by the client:
[2017-03-10 20:56:01,395] ERROR Closing socket for x.x.x.x:9092-x.x.x.x:46674 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 8 and apiVersion: 2
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'member_id': String length -1 cannot be negative
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at org.apache.kafka.common.requests.OffsetCommitRequest.parse(OffsetCommitRequest.java:260)
However I dont think this is the main problem, as they sometimes occur and sometimes dont. Just about 3-4 of these for thousand commits.
I wrote a simple test single consumer against a single broker, and partition. I published 2.1 million messages and was able to consume them without an issue. Ran it several times.
https://github.com/SOHU-Co/kafka-node/commit/283e6a88492168ed86993b683e5fc0868a469e26
If you're interested in running it checkout the large-message-test branch.
You will need to run the test w/o timeout since it could take about 14 minutes to run.
./node_modules/.bin/mocha --no-timeouts test/test.consumerGroup.js --grep "large"
How many consumers are your group?
What I have found out though is that the client always receives the exact same offset in every message, they just differ by partition. Meaning that even if i start a fresh consumer-group, the very first message I get on all of the 30 partitions is a number like: "135988" for partition: "0" or "138391" for partition "1". And they do not change; even after consuming 500k messages a message on partition 0 will have the offset of "135988".
Also: If I use client.getEarliestOffsets() run through it and apply client.setOffset() to every partition I received (I would expect to retrieve messages again, right from the beginning of the topic) this sometimes works, but for about 25% of the time nothing happens at all, and when I query the consumer-group state using Kafka Tool, I can see that the offset of the consumer-group has not been reset.
@hyperlink for this use-case we are only running a single consumer. I will try out your branch, could it make a difference having 30 partitions?
I can confirm that large-message-test is successfully running on my workstation using a docker kafka broker @ version 0.10.0.1, although the broker tells me there is still a lag of "18,921" messages on for the test consumer group, it has received all 2.1 mio messages. I will get some sleep and take another look at this tomorrow.
@hyperlink Okay, so this Problem is still on. As mentioned above I can confirm that your test is running on my machine as well + I have added a few things to check if having 30 partitions changes this behavior - and it did not, it just made the test a lot faster ;) (as expected).
However I discovered more strange things:
_We grab all partitions from a topic using offset.fetchEarliestOffsets() we then iterate through these partitions and call consumer.setOffset(topic, partition, 0) for each, after that is done we call consumer.commit()_
I would expect to see my consumer-group lags to be at max/end value now, however they stick with a lag of 0 and do not change at all. Sometimes it works, sometimes it does not. You can check the function that resets our offsets here.
Our latest guess is that the offset trouble is caused by keyed partitioners.
@krystianity updating the offsets in the consumer changes the internal offset state of the consumer (which offsets to fetch next). The offset is automatically advanced after every fetch so changing this manually can lead to consuming duplicate messages. This could explain the high lag.
I think to do this correctly you will need to keep track of offsets outside of the consumer and call sendOffsetCommitRequest instead.
I dont think I really understood your answer yet, how would you go about making a consumer start consuming from the earliest offset, regardless of his current offset/lag state?
Another thing we have detected while investigating our stopping consumers is that they always stop at the same amount of consumed messages:
{
"total": 561539,
"partitions": {
"0": 24243,
"1": 24526,
"2": 7164,
"3": 7364,
"4": 24884,
"5": 24295,
"6": 24158,
"7": 23835,
"8": 24632,
"9": 24202,
"10": 7101,
"11": 24881,
"12": 7209,
"13": 25238,
"14": 7188,
"15": 24106,
"16": 7214,
"17": 24248,
"18": 7353,
"19": 24302,
"20": 23924,
"21": 24434,
"22": 24870,
"23": 24604,
"24": 24528,
"25": 7202,
"26": 23994,
"27": 25452,
"28": 7254,
"29": 7134
}
}
And it does not matter if we use the Consumer, HighLevelConsumer or ConsumerGroups - we pick a fresh GroupID and start to consume at the earliest offset and every attempt is stuck after 561539 messages. (Using different fetchMaxBytes settings does not matter).
(autoCommit or our own custom commits are not active in these tests, we are not committing or setting offsets at all currently.)
ok, I think there's several different scenarios being discussed and it's a bit confusing.
My last message was regarding manually committing instead of using autoCommit:true. In this case don't call .setOffset this could lead to issues.
The other scenario is to always consume from the earliest available. Then using fromOffset: 'earliest' with a new groupId seems reasonable.
Then there's the issue with stalled consumer with 30 partitions (and autoCommit off) is something that can be verified with a test (though with the current docker setup we can only have one broker).
@krystianity here's the updated test I wrote earlier to consume from 30 partitions w/ autoCommit off. I ran it several times without issues.
I ran the test several times in a row (test need to be adjusted) without resetting the kafka container was able to consume up to 6.3 million messages in 527 seconds.
Kafka bin tools
./kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 30 --topic ThirtyPartitions
./test/large-cs-test.js
'use strict';
const host = process.env['KAFKA_TEST_HOST'] || '';
const ConsumerGroup = require('../lib/consumerGroup');
const _ = require('lodash');
const uuid = require('uuid');
const async = require('async');
const Client = require('../lib/client');
const Producer = require('../lib/highLevelProducer');
const assert = require('assert');
let topic, client, consumerGroup, consumed, interval;
const numberOfMessages = 2100000;
const chunkSize = 30;
function sendUUIDMessages (times, topic, done) {
console.log(`Trying to send ${times} messages`);
const producer = new Producer(client, { requireAcks: 1 });
assert(producer.ready, 'Producer is not ready');
console.log('producer ready');
const allMessages = _.times(times, uuid.v4);
const chunked = _.chunk(allMessages, times / chunkSize);
console.log(`${allMessages.length} messages generated. Sending in ${chunked.length} chunks of ${times / chunkSize}`);
let count = 0;
async.eachSeries(chunked, function (messages, callback) {
console.log(`sending ${++count}`);
producer.send([{topic: topic, messages: messages}], callback);
}, done);
return producer;
}
function init (done) {
topic = 'ThirtyPartitions'; // uuid.v4();
async.series([
function (callback) {
client = new Client(host, uuid.v4());
client.once('connect', callback);
},
function (callback) {
client.refreshMetadata([topic], callback);
},
function (callback) {
sendUUIDMessages(numberOfMessages, topic, function (error) {
if (error) {
return callback(error);
}
assert.equal(client.topicPartitions[topic].length, chunkSize);
callback();
});
},
function (callback) {
client.close(callback);
}
], function (error) {
if (error) {
return done(error);
}
console.log('All messages sent');
done();
});
}
function bench (done) {
const groupId = uuid.v4();
console.log(`starting to consume using groupId: ${groupId}`);
const time = process.hrtime();
consumerGroup = new ConsumerGroup({
fetchMaxBytes: 1024 * 100,
groupId: groupId,
host: host,
sessionTimeout: 8000,
heartbeatInterval: 7000,
retryMinTimeout: 250,
autoCommit: false,
fromOffset: 'earliest'
}, topic);
const stats = {}
consumed = 0;
consumerGroup.once('error', done);
consumerGroup.on('message', function (message) {
stats[message.partition] = message.offset;
if (++consumed === numberOfMessages) {
console.log(`consumed ${consumed}`);
const [seconds, nanoseconds] = process.hrtime(time);
console.log(`took ${seconds}s ${nanoseconds / 1e6}ms`);
console.log(stats);
clearInterval(interval);
done();
}
});
consumerGroup.once('connect', function () {
interval = setInterval(function () {
console.log(`consumed ${consumed}`);
}, 1000);
});
}
async.series([init, bench], function (error) {
if (error) {
console.error('failed with', error);
}
client.close();
consumerGroup.close(function() {})
});
@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.
using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.
combining .setOffset with manual consumer.commits() is a bad idea
expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer
when "experiencing missing messages" and "committed high offsets" make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.
Most helpful comment
@hyperlink thanks again for your support and attempts to ensure the trust in this lib, we were able to identify the reason for the problems. And I am happy to sum them up, for anyone else who might be stuck in a similar situation in the future.
using 1 or 30 partitions does not matter, having a single instance or a cluster does not matter either, we ran the large-message-test against all kinds of setups.
combining .setOffset with manual consumer.commits() is a bad idea
expecting .setOffset(topic, partition, 0) to work on a running consumer, even paused, can be problematic and is probably a bad behavior for any kind of kafka consumer
when "experiencing missing messages" and "committed high offsets" make sure to understand the topic you are consuming - we have a large amount of topics here, with different owners. Keyed partitions might be very well subject to log compaction in large fashion depending on the data and frequency of publishes the topic goes through.