Hello,
Can I process more than one job (batch) in same context.
// something like that
queue.process(function(jobs, done) {
var messages = [];
for (var i = 0; i < jobs.length; i++) {
messages.push(jobs[i].data);
}
batchWrite(messages);
});
Are you looking for concurrent job processing? If so please see the full documentation and check Queue#process:
https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueprocess
If you are really looking for a way to handle multiple jobs with ONE process execution, I am convinced you should change your code design. Just pass all the data you want to process in a batch along with ONE job.
yes i'm looking to handle multiple jobs ,it will be a great feature and most desired from all developer.
already i have another case that pass all the data as batch in ONE job. but now i have new situation it so hard to send bulk like before.
@mseld if I undersand you correctly, you want the process function to wait until x number of jobs are in the queue and then fetch them all at once in your process function, is that correct?
@manast correct
+1 For that feature, it should be very efficient though. Most likely one is adding and processing millions of jobs when using such an approach
@mseld Maybe it will help you https://github.com/peecky/node-buffering
I do have a similar issue, where I need to group multiple items, before sending the bulk to a time-series database. TSDB can handle huge amounts of write / s, but less than a few hundred HTTP reqs/s.
@yurtaev The package you mentioned is definitely interesting, we could implement something like this
worker.js
var Buffering = require('node-buffering');
module.exports = function() {
var buffer = new Buffering({sizeThreshold: 1000, timeThreshold: 1000});
buffer.on('flush', function(batch) {
// process batch here
});
process.on('exit', buffer.flush);
process.on('SIGINT', buffer.flush);
process.on('SIGTERM', buffer.flush);
return function(payload, done) {
buffer.enqueue(payload.data)
done();
}
}
And call it like
queue.process('./worker')
@manast @weeco what would be the downside of using this approach ?
I would like to note that the node-buffering package does not seem maintained nor well tested, so it's not a production-ready solution.
For the record an implementation idea:
since the queues atomicity is based on moving one job at a time, such a bulk approach probably would need to add the jobs as subjobs of a parent job, so for instance in the job data structure we could also have a field called "bulk" were an array of all the subjobs are stored.
I have implemented this for my own project now, and what I do is taking a certain amount of jobs from the waiting list whenever the job runs, and then applying those in batch. I use my implementation like this
queue.process(async (job) => {
let { jobs_to_process, waiting } = await batch_jobs({
job: job,
batch_size: 200,
max_timeout: 5 * 60 * 1000,
});
let datas = jobs_to_process.map((x) => x.data);
console.log(`> Processing ${datas.length} jobs`);
your_batch_function(datas);
// I'd rather move them to finished here, but internal .moveToCompleted gives an error (understandably)
await Promise.all(waiting.map((x) => x.remove()));
});
and batch_jobs is defined as follows:
let batch_jobs = async ({
job: actual_active_job,
batch_size,
max_timeout,
poll_time = 5 * 1000, // I default to checking every 5 seconds.. I know polling is mostly bad but eh
}) => {
// NOTE Why batch_size - 2?
// .... `queue.getWaiting()` returns inclusive, so `0 - n` yields `n + 1` result
// .... and on top of that we have our current job to run, so we need one fewer
let waiting = await actual_active_job.queue.getWaiting(0, batch_size - 2);
let jobs_to_process = [actual_active_job, ...waiting];
if (jobs_to_process.length === batch_size) {
return { jobs_to_process, waiting };
} else {
let current_timeout = Math.min(max_timeout, poll_time);
await Promise.delay(current_timeout);
return await get_enough_jobs({
job: actual_active_job,
batch_size: batch_size,
max_timeout: max_timeout - current_timeout,
});
}
};
I understand the polling isn't the most performance friendly but I needed a way to pause the current job till I have enough, but also to make sure we aren't stuck on the last 199 if my batch_size is 200. I think this will also work with the threaded processor
I wanted to share my use case and solution and get feedback from people here.
My use case is different in a way that I have plenty dynamic batch queues, and I prefer to store the batch in a central DB rather then in memory.
Use case/requirements:
Solution:
When processing a 'batch_processing' Job
2.a. If need to process batch now, if redis.LLEN(batches:batch_name) >= 1000 or first job in redis list past 5 seconds
Then, Fetch batches of 1,000 and process them, until
2.b. add 'delayed_batch_processing' according to queue_time of first element in queue // why? because we were pop'ing jobs out of the list and the and might left with some jobs in que and ondition of 1.b. might got missed out.
2.c. removeOnComplete
When processing a 'delayed_batch_processing' Job
3.a. add 'batch_processing' Job with high priority.
3.b. removeOnComplete
// why not process right away? Because requirement 6 need all job processing to be locked on the same unique jobId
// why high priority? it is fare, because it now time to process the job, the jobs there where already queued long time ago.
Repeated job of delayed_batch_processing every 5 seconds or less.
// why? because adding a delayed job on 1.c. can be ignored id the same jobId is stuck long time in waiting queue.
Possible improvements:
@nitzanav thank you for you valuable input.
@manast thank you for this amazing project.
I will share my code when done, but I wish to get some feedback before going to implement. Can you share feedback?
@dralletje @mseld @weeco @Overdrivr @yurtaev Can you please give feedback on the design I presented above? Pros/Cons/Challenges I might be running into.
I'm also looking for this.
The simplest possible design I can think of, built on to top of an atomic job acquisition (ie, the assumption is each job is locked immediately as soon as it is acquired) is:
min(job_lock_timeout * 0.05, 30 seconds) seems like a reasonable heuristic, or accept a configuration option. We'll call this batch_timeout.batch_timeout time has passed. The timeout for the next job acquisition becomes batch_timeout - acquisition_time where acquisition_time is the amount of time it took to get this jobs.The hardest part of batch processing is error reporting. If the batch processing function throws an error then that likely that means all the items failed. Having a way to mark individual items as failed is often useful as well however.
This design also means you don't need any special setup or handling on the queue creation side, the extra logic is handled entirely on the processing side. This could manifest as simply as a call to queue.batchProcess(handler) instead of queue.process(handler).
@mscharley
I like the simplicity, I am wondering if this is a fit for my use case.
I have made my design a bit more simple, but the majority is the same.
In addition I personnaly have another workarround to batch the jobs when I
insert them be I am getting the jobs from a kinesis stream so I can batch
them then. So I might not get to implement this.
in my case the job data contains a "batch_key", jobs with same key shall be batched together.
handreds/thausands of little batches in parallel can wait for the batch
timeout, I wonder if it can work well in this case.
according to your design, once you processed the entire batch sucessfully
you need to moveToFinished all the jobs one by one or implement a lua
script to mark them all at once. right?
I wanted to note the differences and similarities between the two solutions.
both maintains batch queue, one in redis the other in memory.
both uses delayed execution at the batch_timeout, one via insertion of
delayed job, the other via setTimeout.
And here are some more differeces:
the redis queue approach can complete the jobs since it has a "persisted"
delayed job to promise the the batch will be processed. The memory solution
promises the same thing using the lock mechsnism.
the redis solution keeps distributed batches cross worker processes and
machines. the memory solution keeps a separate batch in the memory of the
worker process
the formag of the previous comment is messed up, will fix it soon
is this a possible solution? https://github.com/OptimalBits/bull/issues/1059
It may work, but I'd still personally prefer something built into bull with the support that that would entail. Specifically, I'd be worried about getting the error handling right for batch processing.
+1 for batched jobs.
Our use case: replicating writes from our main DB to our caches.
:+1: too
My use case is multiple indexing requests to Elasticsearch that I'd like to bulk index instead of doing many single document requests...
Fair to say that in case of streaming data rows and need to process a batch as one, I think that it is better to use Kafaka/Kinesis.
In my use case the batch key was dynamic and unpredictable, my solution was in the application layer, I groupped a queue in the application (actually using RXJS group operator and bufferTime), then after processing marked all jobs to done/acked them.
In practice I migrated to RabbitMQ, but I guess that the same is achievable with bull using moveToCompleted or "done" callback function that can be collected and called once you processed the entire batch.
Hi Guys!
I'm looking to pause / resume only selective jobs (or a batch of jobs) and not the complete Queue.
As multiple users use cases would not be implementable since if one users pauses his jobs then only functionality I've seen yet is pausing the Queue here : https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queuepause
And pausing queue would stop the complete queue, hence all users will be affected.
Is there a workaround for this?
Also, I feel like this thread is onto something and functions for a batch of jobs would really help many use cases.
I'd like to share mine and get feedback from people here.
I have a microservice architecture.
Use case / Requirement:
1-talk to three different microservices to get the processing done (output made)
2-Iterate over these three steps n times (depending upon user data)
3-Handle multiple Users
4-Batch process Jobs for users. (Batch pause jobs /selective jobs in the queue not the complete queue)
Solution:
1- I have grouped all the steps in a single job.
2- A function that gets user data and creates n jobs after iterating on the number n of data rows.
A named consumer with the @Process decorator which will get data from Job for all three microservices and call them in order to get the final processing output ready .
3-Declared a single Queue on which the jobs will be pushed in step 2 and users will be treated in a first come first serve basis. So 2nd user would have to wait until all the jobs by user 1 have been completed.
Currently, Using the Pause Queue and Resume Queue functions inorder to stop / resume processing.
Now, the 4th requirement is the part which is unhandled.
Scenario would be :
User 1 Creates Jobs and they get pushed to queue and are under processing.
User 2 Creates Jobs .....
Now User 1 wants to pause his jobs / stop.
So we need to identify all the jobs related to that particular user in the queue which can be done using the data object passed to the job (maybe there's a better way).
But how to pause / resume / stop / get progress of that particular batch or user related jobs?
@ahmad-punch pausing only a selection of jobs in a queue is currently impossible, and I have a hard time trying to think how such functionality could be implemented with current data structures. In general what you are suggesting is a pretty hard problem to solve, at least in an efficient way.
So How should I tackle this problem?
Maybe create a sperate queue for every user and destroy them when the jobs are completed or when user sends signal that he doesn't wanna continue with processing?
But for this the queues creation needs to be dynamic and also it could destroy per requirement.
Probably bull is not the right tool for the requirements you have, maybe you can built something on top a SQL database, where you can do more complex queries on the jobs that actually are part of a given customer (and are not paused) and so on.
Most helpful comment
:+1: too
My use case is multiple indexing requests to Elasticsearch that I'd like to bulk index instead of doing many single document requests...