Without rate limiter
const queue = new Bull("test", {
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
// limiter: {
// duration: 1000,
// max: 1,
// }
});
queue.process(async (job, done) => {
console.log(job.data.id);
done();
});
(async function () {
for (let i = 0; i < 100; ++i) {
queue.add({id: i});
}
for (let i = 0; i < 100; ++i) {
queue.add({id: i});
}
})();
processes jobs in correct order:
0 1 2 3 4 5 6 7 8 ... 99 0 1 2 3 4 5 6 7 8 9 10 ... 99
With rate limiter
processes jobs in random order:
0 74 9 1 65 62 3 43 7 16 44 46 ...
I guess it's because of the way it's designed (maybe some Redis limitations?) but is it expected behavior and is there a way to use rate limiter AND process jobs in the order they were added to queue?
hmm, when the rate has surpassed the limit the jobs are delayed, so it could be difficult to guarantee order, however I think there should not be completely random, maybe we are doing something wrong.
@manast Is there a way to not put the jobs into the delayed queue when the rate limiter is hit? I just ran into this problem as we dont process the jobs in the bullwhip app instance itself.
@aleccool213 the rate limiting is implemented by delaying the jobs when the limit is hit, so no, it is not possible to avoid the delay. However I will take a look into why the jobs are so randomized, maybe it is possible to provide order guarantees but I have to study the code before I know.
@manast As my use case is to not define a process method for queues, it looks like the delayed jobs which are put there by rate limiting will ever get worked on. I need to have an option in the queue config to turn this off. The queue that hits the rate limit can just not do anything. Can you point me to where in the code it decides to put the job into the delayed queue? I can make a PR.
@aleccool213 the rate limiter is implemented in the moveToActive lua script: https://github.com/OptimalBits/bull/blob/master/lib/commands/moveToActive-8.lua
Time to learn Lua 馃拑
Running into this as well. Using the limiter ignores priority essentially. Without the limiter priority determines the order pretty deterministically. But with the limited set to something like 1 job every 1500ms, the jobs are processed in seemingly random order.
I just want it to pull from the queue based on priority. Perhaps if they are pushed to delay that delay amount could be based on priority at the time?
It is a bug. The limiter puts the jobs with an increasing delay, so something is not being done correctly.
I'm running into the same, I want to use the rate-limiter and priority on the same queue but it's not working. Is there any progress? I was thinking on using "bounceBack: true" hoping that it'll solve it but I'm not sure I fully understand it
@manast Is this bug present in BullMQ or is it somehow addressed there?
EDIT - Confirmed that BullMQ has the same problem: https://github.com/taskforcesh/bullmq/issues/214
Here is a full test case that can be added to the test_rate_limiter test suite to demonstrate the rate limiting/priority problem:
it('should obey job priority', done => {
const newQueue = utils.buildQueue('test rate limiter', {
limiter: {
max: 1,
duration: 150
}
});
const numJobs = 100;
const priorityBuckets = {
1: 0,
2: 0,
3: 0,
4: 0
};
const promises = [newQueue.pause()];
for (let i = 0; i < numJobs; i++) {
const opts = (() => {
if (i % 5 == 0) {
return { priority: 1 };
} else if (i % 3 == 0) {
return { priority: 2 };
} else if (i % 2 == 0) {
return { priority: 3 };
} else {
return { priority: 4 };
}
})();
priorityBuckets[opts.priority] = priorityBuckets[opts.priority] + 1;
promises.push(newQueue.add({ id: i }, opts));
}
const priorityBucketsBefore = _.reduce(priorityBuckets, (acc, value, key) => {
acc[key] = value;
return acc;
}, {});
promises.push(newQueue.resume());
Promise.all(promises).then(() => {
newQueue.process(job => {
const priority = job.opts.priority;
priorityBuckets[priority] = priorityBuckets[priority] - 1;
for (let p = 1; p < priority; p++) {
if (priorityBuckets[p] > 0) {
const before = JSON.stringify(priorityBucketsBefore);
const after = JSON.stringify(priorityBuckets);
done(new Error(
`Priority was not enforced, job with priority ${priority} was processed before all jobs with priority ${p} were processed. Bucket counts before: ${before} / after: ${after}`,
));
return Promise.reject();
}
}
return Promise.resolve();
});
});
newQueue.on('failed', err => {
done(err);
});
newQueue.on('completed', () => {
_.after(numJobs, () => {
try {
expect(_.every(priorityBuckets, (value) => value === 0)).to.eq(true);
done();
} catch (err) {
done(err);
}
});
});
}).timeout(20000)
This finally ended on my desk, I will update you with my findings.
This has been partially resolved with this PR: https://github.com/OptimalBits/bull/pull/1816
However, order will be kept in a best effort. With current design it is impossible to give guarantees about the order of rate limited jobs. If this is fixed it will require some design changes that probably will be implemented only in BullMQ, but it will not be high priority right now since it is everything but trivial to fix.
closing since no better solution will be available for some time.
@manast I tried this out and am wondering about the following scenario:
{ max: 1, duration: 500 }Result:
Expectation:
Is this the part that is impossible to fix without a major design change?
that sounds a bit unexpected, could you prepare that case for me so that I can reproduce and check what is going on?
@manast Sure, I modified the existing skipped test to demonstrate this issue. The initial jobs that are queued have priorities 2-5. The job that is added later, after 25 jobs have processed, has a priority of 1. Every time I run this, the priority 1 job is executed last:
it.only('should obey job priority', async () => {
const newQueue = utils.buildQueue('test rate limiter', {
limiter: {
max: 1,
duration: 500
}
});
const numJobs = 50;
const priorityBuckets = {
1: 0,
2: 0,
3: 0,
4: 0,
5: 0
};
const numPriorities = Object.keys(priorityBuckets).length-1;
const processed = [];
let canQueueLateJob = true;
let lateJobQueued = false;
newQueue.process(job => {
const priority = job.opts.priority;
priorityBuckets[priority] = priorityBuckets[priority] - 1;
processed.push({ id: job.data.id, priority: priority });
console.log(`Processed: ${job.data.id}, Priority: ${priority}`);
if (canQueueLateJob && processed.length === 25) {
canQueueLateJob = false;
newQueue.add({ id: "newJob" }, { priority: 1 })
.then(() => {
lateJobQueued = true;
console.log("Added new job");
})
}
// for (let p = 1; p < priority; p++) {
// if (priorityBuckets[p] > 0) {
// const before = JSON.stringify(priorityBucketsBefore);
// const after = JSON.stringify(priorityBuckets);
// throw new Error(
// `Priority was not enforced, job with priority ${priority} was processed before all jobs with priority ${p} were processed. Bucket counts before: ${before} / after: ${after}`
// );
// }
// }
});
const result = new Promise((resolve, reject) => {
newQueue.on('failed', (job, err) => {
reject(err);
});
const afterNumJobs = _.after(numJobs + 1, () => {
try {
//expect(_.every(priorityBuckets, value => value === 0)).to.eq(true);
expect(lateJobQueued).to.eq(true);
expect(processed.length).to.eq(numJobs + 1);
expect(processed[processed.length - 1].id).not.to.eq("newJob");
expect(processed[processed.length - 1].priority).to.gt(1);
resolve();
} catch (err) {
reject(err);
}
});
newQueue.on('completed', () => {
afterNumJobs();
});
});
await newQueue.pause();
const promises = [];
for (let i = 0; i < numJobs; i++) {
const opts = { priority: (i % numPriorities) + 2 };
priorityBuckets[opts.priority] = priorityBuckets[opts.priority] + 1;
promises.push(newQueue.add({ id: i }, opts));
}
const priorityBucketsBefore = _.reduce(
priorityBuckets,
(acc, value, key) => {
acc[key] = value;
return acc;
},
{}
);
await Promise.all(promises);
await newQueue.resume();
return result;
}).timeout(60000);
Most helpful comment
This finally ended on my desk, I will update you with my findings.