Hello,
From the docs:
/***
* For each named processor, concurrency stacks up, so any of these three process functions
* can run with a concurrency of 125. To avoid this behaviour you need to create an own queue
* for each process function.
*/
const loadBalancerQueue = new Queue('loadbalancer')
loadBalancerQueue.process('requestProfile', 100, requestProfile)
loadBalancerQueue.process('sendEmail', 25, sendEmail)
loadBalancerQueue.process('sendInvitation', 0, sendInvite)
It seems that this has been a major drawback, for my use case at least, and I was wondering why is there such a limitation? If it's just a matter of code refactoring then perhaps you can make it more clear to me what needs to be changed in order for the concurrency per named processor to work properly so I can try to submit a PR.
My use case is one queue per user and each user queue has multiple named processors. However, I need the concurrency for each named processor to be respected instead of stacking up.
Thank you for your time
I am not sure there can be defined a consistent behaviour by having a concurrency setting per named job, since you still have only one queue, lets say you have 2 named jobs, name A with concurrency 1, and B with concurrency 5, if then in the queue you have something like this: BAAAAA. Would it be reasonable that B is not processed until A has been processed one by one serially (since concurrency is 1 for A), even if there is capacity left for processing B much earlier?
Hm, I understand what you are saying but, if i had A with concurrency 1 and B with concurrency 5, I would expect if i added BAAAAA to run the 1 x B and 1 x A in "parallel"...or maybe at least it would be up to the caller to decide if they should stack up or not.
An idea that comes to mind in order to achieve what I am saying, it could be an internal queue (aka a simple array or queue) and after each getNextJob call, for the specific type of the job returned by the method, check if the concurrency for its associated processor is filled, and push it back to the internal queue if yes, or process it if not.
There are definitely cases where stacking up is not wanted. For example in my case where i set 2 for A and 2 for B, I do not want to run more than 2 named processors of type A in order to avoid hitting the rate limit of an API which my 'A' named processor is using underneath but with stacking up, if i add 4 A's, then they all run and i am hitting the rate limit.
The behaviour you are requesting is to basically have two queues, one for A and one for B. Having internal queues in the worker is very complex if you want to achieve a consistent and robust system...
That's how I am going to solve it for now, i will split the queues, but that means more complex code on my side since I have to maintain multiple queues for the same user which in turn makes other things more complex/less performant e.g. loading the jobs from all queues for a specific user but I have no other option I am afraid :( It would have been much cleaner/elegant with the named processors.
It would had been nice if the concurrency was respected somehow. The internal queue (a simple array) didn't sound too bad in my head but I haven't tried implementing this solution to the library so perhaps you are right regarding complexity. I will definitely give it a shot if I have some spare time and see how it might work.
Perhaps though the concurrency argument for named processors can be removed in order to simplify documentation as well and provide a Queue.prototype.setConcurrency() or pass it via constructor options instead.
I find it more readable instead of doing this:
loadBalancerQueue.process('requestProfile', 100, requestProfile)
loadBalancerQueue.process('sendEmail', 25, sendEmail)
loadBalancerQueue.process('sendInvitation', 0, sendInvite)
The 0 would cause someone reading the code to scratch his head if he is not familiar with the library and will definitely not figure out that the above will stack up but if you do this:
loadBalancerQueue.setConcurrency(125);
loadBalancerQueue.process('requestProfile', requestProfile)
loadBalancerQueue.process('sendEmail', sendEmail)
loadBalancerQueue.process('sendInvitation', sendInvite)
it will be more clear i think that the 125 is for the whole queue. Just a thought :)
For bull 4.0 we are going to rewrite it in TS, and the whole configuration of queues and process will be much cleaner.
Hey! First of all, great work on Bull folks!
I don't understand the point of having concurrency set for each process if it's going to stack up. I mean, it doesn't really matter the number you set on each of them, right?
In my use case, I have three different jobs that should never run at the same time. I want to have a queue with concurrency of 1 at any given time. Correct me if I'm wrong but, as it is now I should do something like.
queue.process('a', 1, aJob)
queue.process('b', 0, bJob)
queue.process('c', 0, cJob)
So, the queue doesn't have a per-process concurrency setting. It only has a global setting for the queue that's a sum of the individual concurrency settings for each process.
Maybe I'm missing something, but I think that's not intuitive and confusing. Is there a use case I'm missing here? Or some other configuration option I'm not aware of?
@limoragni your only option at the moment to make it work is to use separate queues per job type :( It was a major pain point for my use case as well.
@limoragni In newer versions we will move the concurrency level to the queue constructor instead. As you say it is confusing, but there is no real way to have finer concurrency within one queue.
Most helpful comment
For bull 4.0 we are going to rewrite it in TS, and the whole configuration of queues and process will be much cleaner.