Bull: Concurrency in sandboxed queue

Created on 16 Aug 2020  路  17Comments  路  Source: OptimalBits/bull

I have my queue processing defined as
queue.process('crawl',1,folder_name+'/api/processors/crawl.js');

The thing is I want this to strictly run this named process called crawl one at a time. I only have one server running. But I have noticed multiple times that more than one queue items become active. I cant have multiple jobs running simultaneously as the job will interfere with any other running job.

Things that I checked:
1) did any of the job get stuck? is that why is it running a second job

  • no. none of the jobs were stuck. it just runs multiple jobs.
    2) is there multiple worker
  • cross checked .. all the debug printed to the console turns up on the terminal of the same machine..

It just looks like multiple threads are running. How to I strictly make sure that only one job happens at a time?

What does 1 mean here - does it represent the number of threads or the number of tasks that will run concurrently per thread ?

Bull version -3.16

BETTER DOC

Most helpful comment

If you create a new instance of a Bull queue and define the processor, then that instance is in fact a new worker. The concurrency factor is per worker. If you have 3 workers with concurrency 1 then they will process in parallel following a round robin scheme.

All 17 comments

image

I've just started messing around with this, but from what I've learned, calling "process" multiple times makes that concurrency number stack.

For example, let's say you have an API endpoint that calls process and adds a job when hit. If 3 clients hit that API at once, process will be called 3 times "horizontally" and thus 3 concurrent jobs will be active.

I have a similar requirement when I want to limit the number of active jobs, but haven't found a solution yet. Hopefully, someone else will chime in

If you create a new instance of a Bull queue and define the processor, then that instance is in fact a new worker. The concurrency factor is per worker. If you have 3 workers with concurrency 1 then they will process in parallel following a round robin scheme.

I have the process defined only at one place - which is called on server lift.

But I do create multiple instances of the queue in various controllers(for adding to the queue). If I dont redefine the process again, it should not count as a worker right?

Also this is a problem that I am experiencing only with sandboxed queues. Regular queues are working fine as expected.

this job takes 3 -10 mins and is blocking the event loop(it executes a shell script). so I tried increasing the lockDuration as well. The 2nd job becomes active instantly.

Are you sure you are returning the correct promise in your processor?

If you can write a simpler processor that reproduces the issue I can look into it.

ok.. I am able to narrow down the problem

    queue.process('crawl',1,folder_name+'/api/processors/crawl.js');
    queue.process('crawl2',1,folder_name+'/api/processors/crawl.js');
    /**
     * crons
     */

    // Repeat check for hung charging sessions  once every hour
    _.forEach(sails.config.bull.repeats, function (task) {
        if (task.active) {
            queue.add(task.name, task.data, { repeat: task.repeat });
            sails.log.info(`bull repeatable job registered: ${task.name}`);
        }
    });


    queue.process('clean_completed_jobs', 1, function(job,done){
        BullService.deleteBullTasks(1000, 'completed')
        done();
    });

    queue.process('clean_failed_jobs', 1, function(job,done){
        BullService.deleteBullTasks(1000, 'failed')
        done();
    });

for the above code, my expectation is 1 task of each of the named jobs will be run at a time. What happens is 4 instances of crawl gets executed simultaneously.

if I change queue.process('crawl',1,folder_name+'/api/processors/crawl.js'); - change 1 to 2, then instead of 2 concurrent process of named job crawl, I get 5 concurrent jobs of crawl ie (2+1+2). Looks like the other process calls are interfering with each other.

every process definition will increase the number of workers, so in your case you will have 2 workers.
You can instead do something like this:

queue.process(1,folder_name+'/api/processors/crawl.js');

and insinde crawl.js:

switch(job.name) {
 "crawl": return doOneThing();
  "crawl2": return doAnotherThing();
}

Furthermore, please keep your promises in order, it is very important that you do not ignore returned promises, that creates all kind of side effects. For example, you call queue.add without waiting for the promise to resolve, and on both processors you call BullService.deleteBullTasks(1000, 'completed') also ignoring the result. It is better to write it like this:

 queue.process('clean_completed_jobs', 1, function(job){
   return BullService.deleteBullTasks(1000, 'completed')
 });

i.e., do not use the done callback and instead return a promise.

I see ...

So named jobs in sandboxed queue does not work similar to regular named jobs?

this creates an issue where there some jobs I want to run on separate thread vs some of the jobs I want to run on the server's thread as there is server context.

I also created a repo to replicate this issue:
https://github.com/alexjv89/bull_debug

clone it,
npm install
node app.js - starts the server - assumes redis connection on db 1

localhost:1337/bull/ - view all bull tasks
localhost:1337/bull/active - view all active tasks
localhost:1337/bull/complete - view all complete tasks
localhost:1337/bull/create - create one job - refresh this page a number of times to create multiple jobs.

No matter how many tasks I create, the concurrency is 3 for the crawl job.

How do I call queue process such that I can run a particular job in server context (same thread) and another job in separate thread, with both of them independently having concurrency 1 each. Is that possible?

So named jobs in sandboxed queue does not work similar to regular named jobs?

this creates an issue where there some jobs I want to run on separate thread vs some of the jobs I want to run on the server's thread as there is server context.

They work the same. For every defined processor you will get another worker.

You are still not handling promises correctly in that codebase. I have no time to debug the code, but not handling promises correctly will result in all kind of weird behaviour. For example, this pattern repeats all over:

queue.getJobCounts().then(function(counts){
            queue.getRepeatableJobs().then(function(repeats){

You need to return the promises, always:

return queue.getJobCounts().then(function(counts){
    return queue.getRepeatableJobs().then(function(repeats){

You can also use async which makes the code cleaner:

const count = await queue.getJobCounts();
const repeats = await queue.getRepeatableJobs();

// and so on...

Got it.. thank you so much.. Will make the changes.

This is the relevant documentation for this problem is here - https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queueprocess

A name argument can be provided so that multiple process functions can be defined per queue. A named process will only process jobs that matches the given name. However, if you define multiple named process functions in one Queue, the defined concurrency for each process function stacks up for the Queue. See the following examples:

/***
 * For each named processor, concurrency stacks up, so any of these three process functions
 * can run with a concurrency of 125. To avoid this behaviour you need to create an own queue
 * for each process function.
 */
const loadBalancerQueue = new Queue('loadbalancer');
loadBalancerQueue.process('requestProfile', 100, requestProfile);
loadBalancerQueue.process('sendEmail', 25, sendEmail);
loadBalancerQueue.process('sendInvitation', 0, sendInvite);

const profileQueue = new Queue('profile');
// Max concurrency for requestProfile is 100
profileQueue.process('requestProfile', 100, requestProfile);

const emailQueue = new Queue('email');
// Max concurrency for sendEmail is 25
emailQueue.process('sendEmail', 25, sendEmail);

This is what I did not understand before - for named process the concurrency adds up... will need to refactor how I use the queue..

just write your processor as I explained above:

switch(job.name) {
 "crawl": return doOneThing();
  "crawl2": return doAnotherThing();
}
Was this page helpful?
0 / 5 - 0 ratings

Related issues

thelinuxlich picture thelinuxlich  路  3Comments

PhillippOhlandt picture PhillippOhlandt  路  4Comments

pigaov10 picture pigaov10  路  3Comments

davedbase picture davedbase  路  3Comments

inn0vative1 picture inn0vative1  路  4Comments