Version 3.3.0
There's something quite wrong with the queue pause functionality that will cause it to take a significant amount of time to pause a queue. This has been causing huge problems as I've been trying to use Bull and test my processor functions.
After pulling out my hair all day on this, I finally have an example of the problem. You can see that simply having some sort of delay after the job completes (I had a database lookup in my code) causes the pause to be significantly delayed. For me, it take an additional 5 seconds to pause when there is a delay (mocha tests fail after 2 seconds by default).
Here is the Mocha test script:
var Queue = require('bull');
describe('Pause', function() {
it('does stuff slowly', function() {
var queue;
this.timeout(10000);
queue = new Queue('test', {
prefix: '{bull}'
});
return queue.add({
foo: 'bar'
}, {
removeOnComplete: true,
removeOnFail: false
}).then(function() {
var p;
p = new Promise(function(resolve, reject) {
queue.on('completed', function(job, value) {
return resolve({job, value});
});
return queue.on('failed', function(job, value) {
return reject(new Error(job.stacktrace));
});
});
queue.process(function(job) {
return Promise.resolve();
});
return p.then(function() {
var delay;
// 100ms delay to simulate other actions being taken
delay = new Promise(function(resolve) {
return setTimeout(resolve, 100);
});
return delay.then(function() {
var start;
console.log('this is reached quickly');
start = new Date().getTime();
return queue.pause(true).finally(function() {
var finish;
finish = new Date().getTime();
return console.log('this takes a really really long time:', finish - start, 'ms');
});
});
});
});
});
it('does stuff quickly', function() {
var queue;
this.timeout(10000);
queue = new Queue('test', {
prefix: '{bull}'
});
return queue.add({
foo: 'bar'
}, {
removeOnComplete: true,
removeOnFail: false
}).then(function() {
var p;
p = new Promise(function(resolve, reject) {
queue.on('completed', function(job, value) {
return resolve({job, value});
});
return queue.on('failed', function(job, value) {
return reject(new Error(job.stacktrace));
});
});
queue.process(function(job) {
return Promise.resolve();
});
return p.then(function() {
var start;
// No 100ms delay this time
console.log('this is reached quickly');
start = new Date().getTime();
return queue.pause(true).finally(function() {
var finish;
finish = new Date().getTime();
return console.log('this also is reached quickly:', finish - start, 'ms');
});
});
});
});
});
Output:
Pause
this is reached quickly
this takes a really really long time: 5043 ms
✓ does stuff slowly (5186ms)
this is reached quickly
this also is reached quickly: 1 ms
✓ does stuff quickly
2 passing (5s)
I should also add that this timing gets worse with each increase in concurrency.
I will take a look into this later. It could be related to this: "A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized."
https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queuepause
While these queues have nothing in them at all when pausing is attempted, it is possible the issue in the codebase is related to that feature. My gut says something to do with the whenCurrentJobsFinished function is causing the slowdown.
EDIT: It seems the processing promises aren't being resolved in time
Queue.prototype.whenCurrentJobsFinished = function(){
var _this = this;
console.log(_this.processing);
let start = new Date().getTime();
return new Promise(function(resolve){
Promise.all(_this.processing).finally(function(){
let finish = new Date().getTime();
console.log(finish - start);
resolve();
});
});
};
Pause
this is reached quickly
[ Promise {
_bitField: 1,
_fulfillmentHandler0: undefined,
_rejectionHandler0: [Function],
_promise0:
Promise {
_bitField: 0,
_fulfillmentHandler0: undefined,
_rejectionHandler0: undefined,
_promise0: undefined,
_receiver0: undefined },
_receiver0: {} } ]
5072
this takes a really really long time: 5076 ms
✓ does stuff slowly (5199ms)
this is reached quickly
[ Promise {
_bitField: 1,
_fulfillmentHandler0: undefined,
_rejectionHandler0: [Function],
_promise0:
Promise {
_bitField: 0,
_fulfillmentHandler0: undefined,
_rejectionHandler0: undefined,
_promise0: undefined,
_receiver0: undefined },
_receiver0: {} } ]
0
this also is reached quickly: 1 ms
✓ does stuff quickly
I've simplified the test cases to make it easier to debug. I should also note that the promise from queue.process() never seems to resolve (at least in a timely manner) and might be related to this issue.
const Queue = require('bull');
describe('Pause', function() {
it('does stuff slowly', function() {
this.timeout(10000);
const queue = new Queue('test');
queue.process(job => Promise.resolve());
// 100ms delay to simulate other actions being taken
const delay = new Promise(resolve => setTimeout(resolve, 100));
return delay.then(function() {
console.log('this is reached quickly');
const start = new Date().getTime();
return queue.pause(true).finally(function() {
const finish = new Date().getTime();
console.log('this takes a really really long time:', finish - start, 'ms');
});
});
});
it('does stuff quickly', function() {
this.timeout(10000);
const queue = new Queue('test');
queue.process(job => Promise.resolve());
// No 100ms delay this time
console.log('this is reached quickly');
const start = new Date().getTime();
return queue.pause(true).finally(function() {
const finish = new Date().getTime();
console.log('this also is reached quickly:', finish - start, 'ms');
});
});
it('still does stuff quickly', function() {
this.timeout(10000);
const queue = new Queue('test');
// No queue.process call
// 100ms delay to simulate other actions being taken
const delay = new Promise(resolve => setTimeout(resolve, 100));
return delay.then(function() {
console.log('this is reached quickly');
const start = new Date().getTime();
return queue.pause(true).finally(function() {
const finish = new Date().getTime();
console.log('this also is reached quickly:', finish - start, 'ms');
});
});
});
});
Pause
this is reached quickly
this takes a really really long time: 5184 ms
✓ does stuff slowly (5291ms)
this is reached quickly
this also is reached quickly: 2 ms
✓ does stuff quickly
this is reached quickly
this also is reached quickly: 0 ms
✓ still does stuff quickly (106ms)
ok. I see why this is happening. The issue is that if there are no jobs to be processed, the internal variable queue##processing holds the promises to queue##getNextJob, but since there isn't any, it waits 5 seconds which is the default timeout before a queue tries to get the next job again. I will need to see how this can be fixed in a proper way, at least we now know why it happens.
That makes sense why that would happen. I guess the pause() call will need to check to see if we're in that waiting state and cancel the timeout.
As I mentioned earlier, increasing the concurrency also multiplies the delay, so that might be a semi-related issue.
it('does stuff even slower with higher concurrency', function() {
this.timeout(11000);
const queue = new Queue('test');
queue.process(2, job => Promise.resolve());
// 100ms delay to simulate other actions being taken
const delay = new Promise(resolve => setTimeout(resolve, 100));
return delay.then(function() {
console.log('this is reached quickly');
const start = new Date().getTime();
return queue.pause(true).finally(function() {
const finish = new Date().getTime();
console.log('this takes double the slowness of single concurrency', finish - start, 'ms');
});
});
this is reached quickly
this takes double the slowness of single concurrency 10506 ms
✓ does stuff even slower with higher concurrency (10608ms)
You can see that it's just 5 seconds times the concurrency for the delay (I've checked that this holds true for higher concurrency). I wasn't expecting each worker to have to start its 5 second wait after the previous one's wait though, so I think that might be a bug too.
Many thanks! No more delays with 3.3.6.