When we create delayed jobs in a queue from a producer application and try to promote them from a consumer application then we get an error.
The first script produces some delayed jobs
const Queue = require('bull');
const testQueue = new Queue('testqueue', 'redis://127.0.0.1:6379');
testQueue.process((job, done) => {
console.log('Executing Job', job.data);
done();
});
const init = async () => {
await testQueue.add({ test: 'test1' }, {
delay: 1000000,
attempts: 2,
timeout: 10,
removeOnComplete: true,
removeOnFail: true
});
};
init();
This script reads and promotes them to be executed
const Queue = require('bull');
const testQueue = new Queue('testqueue', 'redis://127.0.0.1:6379');
testQueue.process((job, done) => {
console.log('Executing Job', job.data);
done()
});
const init = async () => {
// assume that there is another place where we initialise
// some delayed jobs and there is at least one of them in
// the results of getDelayed
const jobs = await testQueue.getDelayed();
for (let i = 0; i < jobs.length; i++) await jobs[i].promote();
}
init();
(node:13481) UnhandledPromiseRejectionWarning: TypeError: queue.client.promote is not a function
at Object.promote (/Users/ftsokos/mainProj/bull/lib/scripts.js:342:25)
at Job.promote (/Users/ftsokos/mainProj/bull/lib/job.js:281:20)
at init (/Users/ftsokos/mainProj/code_samples/bull.sample.js:11:17)
at <anonymous>
at process._tickCallback (internal/process/next_tick.js:188:7)
(node:13481) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:13481) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
After looking into the code I believe that the problem here is that the queue has not yet
been properly initialised when we are trying to access the promote function.
To fix this issue I added the following code to job.promote function lib/job.js:280
Job.prototype.promote = function() {
const queue = this.queue;
const jobId = this.id;
return this.queue.isReady().then(() =>
scripts.promote(queue, jobId).then(result => {
if (result === -1) {
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
})
);
};
If you want me to, I can upload a PR to fix the issue.
However if this is the expected behaviour or if I am doing something wrong then please
let me know :)
3.7.0
The issue happens only when the consumer side is just initialised (due to the queue not being initialised).
Do we have a PR for this?
hello, I just created a pr for this https://github.com/OptimalBits/bull/pull/1619 :)
Until a new version is cut, I fixed this for my code by doing:
const job = // ...get job
await job.queue.isReady();
await job.promote();
Most helpful comment
hello, I just created a pr for this https://github.com/OptimalBits/bull/pull/1619 :)