I would like to process scheduled jobs using node.js bull. Basically I have two processors that handle 2 types of jobs. There is one configurator that configures the jobs which will be added to the bull queue using cron.
The scheduler will be in one microservice and the each of the processor will be a separate microservice. So I will be having 3 micro services.
My question is am I using the correct pattern with bull?
_index.js_
const Queue = require('bull');
const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});
_processor-configurator.js_
const Queue=require('bull');
const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");
_fetcher-configurator.js_
const Queue=require('bull');
const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");
_fetcher.js_
const Queue = require('bull');
const moment = require('moment');
module.exports = function (job) {
const scheduler = new Queue('MyScheduler');
console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
return Promise.resolve()
};
_alert-processor.js_
const Queue = require('bull');
const moment = require('moment');
module.exports = async (job) => {
console.log("Alert Processor", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
return Promise.resolve()
};
There will be three microservices -
With this setup, sometimes I am getting the error _Missing process handler for job type_
3.10.0
Note that I have posted this question in StackOverflow. But I don't see a lot of questions on bull.js over there. So I am reposting it here.
Looks good, however you cannot use named processors like that, you will need to define the 2 processors in every processor instance, so I think you should better use 2 queues, one for the alarms and one for the fetcher.
Thanks @manast for your response.
you will need to define the 2 processors in every processor instance
I am not clear with the above.
Also, any reason for getting the error _Missing process handler for job type_
Hi! This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.
Thanks @stansv. Understood the queue mechanism now.
Most helpful comment
Hi! This is because both workers use the same queue. Worker tries to get next job from queue, receives a job with wrong type (eg "fetcher" instead of "processor") and fails because it knows how to handle "processor" and doesn't know what to do with "fetcher". Bull doesn't allow you to take only compatible jobs from queue, both workers should be able to process all types of jobs. The simplest solution would be to use two different queues, one for processors and one for fetchers. Then you can remove names from jobs and processors, it won't be needed anymore since name is defined by the queue.