Kafka-node: How do i read data in batches from kafka-node?

Created on 21 Apr 2018  路  10Comments  路  Source: SOHU-Co/kafka-node

How do i read data in batches from kafka let's say an array of 10 messages from Kafka, Perform some opertions on those 10 messages, then get another 10, and process goes on.

I have tried few approaches but nothing is helping me

Approach 1: Just a basic logic by putting records in an array and pausing the consumer until that activity is performed and then resuming the consumer

    import kafka from "kafka-node"

    const client = new kafka.Client("localhost:2181");

    const topics = [
        {
            topic: "webevents4"
        }
    ];
    const options = {
        autoCommit: false,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 100,
        encoding: "utf-8",
        //offset:true
        fromOffset: false,
    };

    // console.log("here"); 

    const consumer = new kafka.Consumer(client, topics, options);

    let notification_batch_max_size=5;
    let notification_batch = []; 


    consumer.on("message", function(message) {

        if(notification_batch.length<notification_batch_max_size){
                notification_batch.push(message);
            }
        else{

            function someHeavyActivity(time, callback) {
                consumer.close(function(err,data) {});
                consumer.commit(function(err,data) {

                })
                var stop = new Date().getTime();
                while(new Date().getTime() < stop + time) {
                    ;
                }
                notification_batch = [];
                                consumer.resume();                
            }
            someHeavyActivity(20000);
    });

    consumer.on("error", function(err) {
        console.log("error", err);
    });

    process.on("SIGINT", function() {
        consumer.close(true, function() {
            process.exit();
        });
    });

Approach 2: Using the Consumer Stream

            import kafka from "kafka-node"

    const client = new kafka.Client("localhost:2181");

    const topics = [
        {
            topic: "webevents4",
            highWaterMark:2,
            offset: 0, //default 0
            partition: 0 // default 0
        }
    ];
    const options = {
        autoCommit: true,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        encoding: "utf8",

        fromOffset: true,
    };



    const consumer = new kafka.ConsumerStream(client, topics, options);
        consumer.setEncoding('UTF8');

    let data ="";
    consumer.on('data', function(chunk) {

       if (chunk !== null && typeof chunk === 'object') {
        try { chunk = JSON.stringify(chunk) } catch (e) { next(e) }
      }
      console.log(chunk.toString());
       data += chunk;

    });

Approach 3: Transform Stream with ConsumerStream

    import kafka from "kafka-node"

    const client = new kafka.Client("localhost:2181");

    const topics = [
        {
            topic: "webevents4",
            highWaterMark:2,
            offset: 0, //default 0
            partition: 0 // default 0
        }
    ];
    const options = {
        autoCommit: true,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        encoding: "utf8",
        //offset:true
        fromOffset: true,
    };





    const consumer = new kafka.ConsumerStream(client, topics, options);



    const Transform = require('stream').Transform;
    const messageTransform = new Transform({
      objectMode: true,
      decodeStrings: true,
      transform (message, encoding, callback) {


        console.log(`Received message ${message.value} transforming input`);

      }
    });

    consumer.on("error", function(err) {
        console.log("error", err);
    });

    process.on("SIGINT", function() {
        consumer.close(true, function() {
            process.exit();
        });
    });

    consumer.pipe(messageTransform);

Any help and guidance will be much appreciated.

Thanks

All 10 comments

I have an additional scenario in this , what if while processing those 10 messages something goes wrong in the system. But I have processed 5 of them and I want to commit those 5. So that next time when consuming starts is reads from 6th message.
How this can be achieved through autocommit set to false and using consumer.commit() function.
Please help with this .

Try using a ConsumerGroupStream.

@hyperlink is that the solution to my problem, basically batches of 10.

@sunil-lulla that is what I would recommend. You can read the stream in paused mode.

@hyperlink Do I need to use Streams at Producer as well, or using only at Consumer will work?

It's up to you @sunil-lulla. The reason I recommend the ConsumerGroupStream for consumer is because of the feature that it will only commit what you have read from the stream.

@sunil-lulla could you suggest what was your final approach? I am also in the same situation where I want to read a batch and then pause, once those are processed, then take next batch. Also, if you can share sample code for the same.

I followed the first approach by @sunil-lulla and I have the following code:

`
consumer.resume();
const events = [];
consumer.on('message', msg => {
events.push(msg.value);
});

    setTimeout(() => {
        consumer.pause();
        console.log(`${events.length} messages found`);
        let count = 0;
        for (let i = 0; i < events.length; i += 10) {
            const subEvents = events.slice(i, i + 10);
            for (const event of subEvents) {
                const userEvent = JSON.parse(event);
                new UserEventsModel(userEvent).save().then(() => {
                    count++;
                    console.log(`${((count / events.length) * 100).toFixed(2)}% events saved`);
                });
            }
        }
    }, 3 * 1000);

`
This code runs inside of a cron job. The issue here is that the onMessage callback is still executing once the consumer pauses. Also it doesn't catch up with messages that were there in the topic partition already but were not processed by the consumer yet. Any suggestions on this?

@hyperlink is that the solution to my problem, basically batches of 10.

is this problem resolved? please suggest me.

You can use ConsumerGroupStream with async.queue , pushing items to the queue.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

twawszczak picture twawszczak  路  6Comments

ashishnetworks picture ashishnetworks  路  4Comments

Sonivaibhav26 picture Sonivaibhav26  路  5Comments

chetandev picture chetandev  路  5Comments

mmiller42 picture mmiller42  路  6Comments