How do i read data in batches from kafka let's say an array of 10 messages from Kafka, Perform some opertions on those 10 messages, then get another 10, and process goes on.
I have tried few approaches but nothing is helping me
Approach 1: Just a basic logic by putting records in an array and pausing the consumer until that activity is performed and then resuming the consumer
import kafka from "kafka-node"
const client = new kafka.Client("localhost:2181");
const topics = [
{
topic: "webevents4"
}
];
const options = {
autoCommit: false,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 100,
encoding: "utf-8",
//offset:true
fromOffset: false,
};
// console.log("here");
const consumer = new kafka.Consumer(client, topics, options);
let notification_batch_max_size=5;
let notification_batch = [];
consumer.on("message", function(message) {
if(notification_batch.length<notification_batch_max_size){
notification_batch.push(message);
}
else{
function someHeavyActivity(time, callback) {
consumer.close(function(err,data) {});
consumer.commit(function(err,data) {
})
var stop = new Date().getTime();
while(new Date().getTime() < stop + time) {
;
}
notification_batch = [];
consumer.resume();
}
someHeavyActivity(20000);
});
consumer.on("error", function(err) {
console.log("error", err);
});
process.on("SIGINT", function() {
consumer.close(true, function() {
process.exit();
});
});
Approach 2: Using the Consumer Stream
import kafka from "kafka-node"
const client = new kafka.Client("localhost:2181");
const topics = [
{
topic: "webevents4",
highWaterMark:2,
offset: 0, //default 0
partition: 0 // default 0
}
];
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: "utf8",
fromOffset: true,
};
const consumer = new kafka.ConsumerStream(client, topics, options);
consumer.setEncoding('UTF8');
let data ="";
consumer.on('data', function(chunk) {
if (chunk !== null && typeof chunk === 'object') {
try { chunk = JSON.stringify(chunk) } catch (e) { next(e) }
}
console.log(chunk.toString());
data += chunk;
});
Approach 3: Transform Stream with ConsumerStream
import kafka from "kafka-node"
const client = new kafka.Client("localhost:2181");
const topics = [
{
topic: "webevents4",
highWaterMark:2,
offset: 0, //default 0
partition: 0 // default 0
}
];
const options = {
autoCommit: true,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
encoding: "utf8",
//offset:true
fromOffset: true,
};
const consumer = new kafka.ConsumerStream(client, topics, options);
const Transform = require('stream').Transform;
const messageTransform = new Transform({
objectMode: true,
decodeStrings: true,
transform (message, encoding, callback) {
console.log(`Received message ${message.value} transforming input`);
}
});
consumer.on("error", function(err) {
console.log("error", err);
});
process.on("SIGINT", function() {
consumer.close(true, function() {
process.exit();
});
});
consumer.pipe(messageTransform);
Any help and guidance will be much appreciated.
Thanks
I have an additional scenario in this , what if while processing those 10 messages something goes wrong in the system. But I have processed 5 of them and I want to commit those 5. So that next time when consuming starts is reads from 6th message.
How this can be achieved through autocommit set to false and using consumer.commit() function.
Please help with this .
Try using a ConsumerGroupStream.
@hyperlink is that the solution to my problem, basically batches of 10.
@sunil-lulla that is what I would recommend. You can read the stream in paused mode.
@hyperlink Do I need to use Streams at Producer as well, or using only at Consumer will work?
It's up to you @sunil-lulla. The reason I recommend the ConsumerGroupStream for consumer is because of the feature that it will only commit what you have read from the stream.
@sunil-lulla could you suggest what was your final approach? I am also in the same situation where I want to read a batch and then pause, once those are processed, then take next batch. Also, if you can share sample code for the same.
I followed the first approach by @sunil-lulla and I have the following code:
`
consumer.resume();
const events = [];
consumer.on('message', msg => {
events.push(msg.value);
});
setTimeout(() => {
consumer.pause();
console.log(`${events.length} messages found`);
let count = 0;
for (let i = 0; i < events.length; i += 10) {
const subEvents = events.slice(i, i + 10);
for (const event of subEvents) {
const userEvent = JSON.parse(event);
new UserEventsModel(userEvent).save().then(() => {
count++;
console.log(`${((count / events.length) * 100).toFixed(2)}% events saved`);
});
}
}
}, 3 * 1000);
`
This code runs inside of a cron job. The issue here is that the onMessage callback is still executing once the consumer pauses. Also it doesn't catch up with messages that were there in the topic partition already but were not processed by the consumer yet. Any suggestions on this?
@hyperlink is that the solution to my problem, basically batches of 10.
is this problem resolved? please suggest me.
You can use ConsumerGroupStream with async.queue , pushing items to the queue.