Kafka-node: Async consumer example [Question]

Created on 1 Nov 2016  路  4Comments  路  Source: SOHU-Co/kafka-node

Does anyone has ready to use example of consumer with BACK PRESSURE? I read in docs that I can use async library. But still don't really understand where to put consumer.pause() method. This also can be added to docs, since I think this is most common use case. Thanks!

question

Most helpful comment

Ok, so I wrote a following snippet:

import config from "./Config";
import logger from "./Logger";
import { queue } from 'async';
import { Client, HighLevelConsumer, Offset } from 'kafka-node';


export class Consumer {
    client: Client;
    consumer: HighLevelConsumer;
    offset: Offset;


    private connect (topic: string) {
        let self = this;

        self.client = new Client(config.kafka.connectionString, config.kafka.clientId);
        self.consumer = new HighLevelConsumer(this.client, [{ topic: topic }], { groupId: config.kafka.clientId });
        self.offset = new Offset(self.client);

        logger.info(`Listening for the ${topic} messages...`);
    }


    public consume<T>(topic: string, threads: number, callback: (msg: T, done: Function) => void): void {
        let self = this;
        self.connect(topic);

        process.on('SIGINT', () => self.consumer.close(true, () => process.exit()));

        self.consumer.on('error', (err: any) => {
            const failedToRebalanceConsumerError = err.message && err.message.includes('FailedToRebalanceConsumerError');
            const leaderNotAvailable = err.message && err.message.includes('LeaderNotAvailable');
            if (failedToRebalanceConsumerError || leaderNotAvailable) {
                return setImmediate(() => self.consumer.close(true, () => self.connect(topic)));
            }
            logger.error(`Kafka error happened: ${JSON.stringify(err)}`);
        });

        self.consumer.on('offsetOutOfRange', function (topicObj: any) {
            topicObj.maxNum = 2;
            self.offset.fetch([topicObj], function (err: any, offsets: any) {
                if (err) return console.error(err);
                const min = Math.min(offsets[topicObj.topic][topicObj.partition]);
                self.consumer.setOffset(topicObj.topic, topicObj.partition, min);
            });
        });

        const q = queue(function(payload: T, cb: any) {
            setImmediate(() => callback(payload, cb));
        }, threads);

        q.drain = function() {
            self.consumer.resume();
        };

        self.consumer.on('message', function (messageWrapper: any) {
            const message: T = JSON.parse(messageWrapper.value);
            q.push(message);
            self.consumer.pause();
        });
    }
}

And then usage:

const consumer = new Consumer();
consumer.consume<SomeMessage>('some_topic', 4, topicMessageHandler);

async function topicMessageHandler (message: SomeMessage, done: Function): Promise<void> {
    logger.info(`Got message ` + JSON.stringify(message));

   // handler code here
   return done();
}

Is it looks correct? Anything else should be improved here?

All 4 comments

Your message handler should call to consumer.pause() and push the message to the queue.

Ok, so I wrote a following snippet:

import config from "./Config";
import logger from "./Logger";
import { queue } from 'async';
import { Client, HighLevelConsumer, Offset } from 'kafka-node';


export class Consumer {
    client: Client;
    consumer: HighLevelConsumer;
    offset: Offset;


    private connect (topic: string) {
        let self = this;

        self.client = new Client(config.kafka.connectionString, config.kafka.clientId);
        self.consumer = new HighLevelConsumer(this.client, [{ topic: topic }], { groupId: config.kafka.clientId });
        self.offset = new Offset(self.client);

        logger.info(`Listening for the ${topic} messages...`);
    }


    public consume<T>(topic: string, threads: number, callback: (msg: T, done: Function) => void): void {
        let self = this;
        self.connect(topic);

        process.on('SIGINT', () => self.consumer.close(true, () => process.exit()));

        self.consumer.on('error', (err: any) => {
            const failedToRebalanceConsumerError = err.message && err.message.includes('FailedToRebalanceConsumerError');
            const leaderNotAvailable = err.message && err.message.includes('LeaderNotAvailable');
            if (failedToRebalanceConsumerError || leaderNotAvailable) {
                return setImmediate(() => self.consumer.close(true, () => self.connect(topic)));
            }
            logger.error(`Kafka error happened: ${JSON.stringify(err)}`);
        });

        self.consumer.on('offsetOutOfRange', function (topicObj: any) {
            topicObj.maxNum = 2;
            self.offset.fetch([topicObj], function (err: any, offsets: any) {
                if (err) return console.error(err);
                const min = Math.min(offsets[topicObj.topic][topicObj.partition]);
                self.consumer.setOffset(topicObj.topic, topicObj.partition, min);
            });
        });

        const q = queue(function(payload: T, cb: any) {
            setImmediate(() => callback(payload, cb));
        }, threads);

        q.drain = function() {
            self.consumer.resume();
        };

        self.consumer.on('message', function (messageWrapper: any) {
            const message: T = JSON.parse(messageWrapper.value);
            q.push(message);
            self.consumer.pause();
        });
    }
}

And then usage:

const consumer = new Consumer();
consumer.consume<SomeMessage>('some_topic', 4, topicMessageHandler);

async function topicMessageHandler (message: SomeMessage, done: Function): Promise<void> {
    logger.info(`Got message ` + JSON.stringify(message));

   // handler code here
   return done();
}

Is it looks correct? Anything else should be improved here?

I don't know if this is necessary but after processing the message we set the queue's cb to be called called inside of a setImmediate as to not block the event loop.

Also, calling close on an error seems a bit excessive. There's a logic in the module to handle intermittent connection failures with either zk or kafka. I would only close and create a Consumer on a FailedToRebalanceConsumerError or a 'LeaderNotAvailable' (the error is a string in an array in this case).

I updated an example, but not sure about those two lines:

const failedToRebalanceConsumerError = err.message && err.message.includes('FailedToRebalanceConsumerError');
const leaderNotAvailable = err.message && err.message.includes('LeaderNotAvailable');
Was this page helpful?
0 / 5 - 0 ratings