We have a queue of items coming in at around 100 messages/s for topic 'topic_name'.
We tried the stream api and standard (flowing and non-flowing) but it seems like the client doesn't process the new items quick enough (there is a lag build up). At the startup, it will process old events (at around 5000-20 000 messages/s) but once it catches up to the new data, it's stuck at around 1-14 messages/s.
We tried with no-kafka and the speed is stable at 100 messages/s with no lag build up.
Is there any config we should set to speed up? Also, it seems like the topic config 'auto.offset.reset': 'latest' or 'auto.offset.reset': 'end' doesn't seem to be taken in effect. We're still getting data from the group.id offset.
Here is the sample stream api code we're using. (We also tried the standard flowing and non-flowing api with the same result).
const consumerConfig = {
'group.id': 'testgroup-1',
'metadata.broker.list': [<list of broker here>],
};
const consumerTopicConfig = {
'auto.offset.reset': 'latest', // doesn't seem to work, we're getting data from group.id offset
};
const consumer = new RDKafka.KafkaConsumer(consumerConfig, consumerTopicConfig);
consumer
.on('ready', () => { console.log('Connected'); })
.on('error', (err) => { console.error('Client error', err); })
.on('event.error', (err) => { console.error('Client (stream) error', err); })
.on('disconnected', () => { console.log('disconnected'); });
const readStream = consumer.getReadStream('topic_name');
readStream
.on('error', (err) => { console.error('Stream error', err); });
function disconnect() {
consumer.once('disconnected', () => { process.exit(0); });
consumer.disconnect();
}
const writeStream = new stream.Writable({
objectMode: true,
write: (chunk, encoding, cb) => {
// here, we are getting the data but too slowly
logData({
data: data.value.toString(),
offset: data.offset,
partition: data.partition,
topic: data.topic,
});
cb();
},
});
// start
readStream
.pipe(writeStream);
process.on('SIGINT', () => { console.log('SIGINT'); disconnect(); });
process.on('uncaughtException', (err) => { console.log('uncaughtException', err); disconnect(); });
The offset reset will only work if there are no committed offsets for a partition. If there are, it will use what is committed.
The only thing I can recommend trying on the client side for debugging these issues is adding 'debug': 'all'to your configuration and seeing how often it is doing fetches.
For your consumer, add this and see what happens:
const readStream = consumer.getReadStream('topic_name', { waitInterval: 0, fetchSize: 16 });
Thanks! It seems like waitInterval: 0 fixed the lag problem. Adding the fetchSize gave us another problem but we didn't check in too much detail. We'll try to run it for a couple of days to see it it works.
On topics with highly important data and low volume, you will also want to set your consumer timeout lower so it fetches quicker instead of backing off when it gets no data:
consumer.setDefaultConsumeTimeout(100); // ms
It's been running for a couple of days now with no lag.
const readStream = consumer.getReadStream('topic_name', { waitInterval: 0 });
Fixed it for us
from what i see in the code, waitInterval seems to only affect when we read zero message; from what i encounter, messages in the specific topic is piling up to 7000, but the consumer is still running at a speed of ~1/s. why is it cause the consume process to slow down to such a speed ?
PS: watiInterval:0 also solve my problem, but i just don't understand how it happened in such a way.
Most helpful comment
On topics with highly important data and low volume, you will also want to set your consumer timeout lower so it fetches quicker instead of backing off when it gets no data: