Does anyone has ready to use example of consumer with BACK PRESSURE? I read in docs that I can use async library. But still don't really understand where to put consumer.pause() method. This also can be added to docs, since I think this is most common use case. Thanks!
Your message handler should call to consumer.pause() and push the message to the queue.
Ok, so I wrote a following snippet:
import config from "./Config";
import logger from "./Logger";
import { queue } from 'async';
import { Client, HighLevelConsumer, Offset } from 'kafka-node';
export class Consumer {
client: Client;
consumer: HighLevelConsumer;
offset: Offset;
private connect (topic: string) {
let self = this;
self.client = new Client(config.kafka.connectionString, config.kafka.clientId);
self.consumer = new HighLevelConsumer(this.client, [{ topic: topic }], { groupId: config.kafka.clientId });
self.offset = new Offset(self.client);
logger.info(`Listening for the ${topic} messages...`);
}
public consume<T>(topic: string, threads: number, callback: (msg: T, done: Function) => void): void {
let self = this;
self.connect(topic);
process.on('SIGINT', () => self.consumer.close(true, () => process.exit()));
self.consumer.on('error', (err: any) => {
const failedToRebalanceConsumerError = err.message && err.message.includes('FailedToRebalanceConsumerError');
const leaderNotAvailable = err.message && err.message.includes('LeaderNotAvailable');
if (failedToRebalanceConsumerError || leaderNotAvailable) {
return setImmediate(() => self.consumer.close(true, () => self.connect(topic)));
}
logger.error(`Kafka error happened: ${JSON.stringify(err)}`);
});
self.consumer.on('offsetOutOfRange', function (topicObj: any) {
topicObj.maxNum = 2;
self.offset.fetch([topicObj], function (err: any, offsets: any) {
if (err) return console.error(err);
const min = Math.min(offsets[topicObj.topic][topicObj.partition]);
self.consumer.setOffset(topicObj.topic, topicObj.partition, min);
});
});
const q = queue(function(payload: T, cb: any) {
setImmediate(() => callback(payload, cb));
}, threads);
q.drain = function() {
self.consumer.resume();
};
self.consumer.on('message', function (messageWrapper: any) {
const message: T = JSON.parse(messageWrapper.value);
q.push(message);
self.consumer.pause();
});
}
}
And then usage:
const consumer = new Consumer();
consumer.consume<SomeMessage>('some_topic', 4, topicMessageHandler);
async function topicMessageHandler (message: SomeMessage, done: Function): Promise<void> {
logger.info(`Got message ` + JSON.stringify(message));
// handler code here
return done();
}
Is it looks correct? Anything else should be improved here?
I don't know if this is necessary but after processing the message we set the queue's cb to be called called inside of a setImmediate as to not block the event loop.
Also, calling close on an error seems a bit excessive. There's a logic in the module to handle intermittent connection failures with either zk or kafka. I would only close and create a Consumer on a FailedToRebalanceConsumerError or a 'LeaderNotAvailable' (the error is a string in an array in this case).
I updated an example, but not sure about those two lines:
const failedToRebalanceConsumerError = err.message && err.message.includes('FailedToRebalanceConsumerError');
const leaderNotAvailable = err.message && err.message.includes('LeaderNotAvailable');
Most helpful comment
Ok, so I wrote a following snippet:
And then usage:
Is it looks correct? Anything else should be improved here?