It is in my feature request list, but currently I have no enough knowledge about it.
I will give it a try and see if I can easily implement it, at least to show you if it was correctly implemented in Moleculer.
@lardissone do u have a PR of this ?
Hey ! Thanks for Moleculer, it is a great tool.
+1 for a working queue, it is mandatory for clustering.
I tried to modify Moleculer to use NATS group queue (options in constructor), but I don't think it is a good idea with the current implementation.
Please somebody write for me what are the main features in task queues/event queues function. I plan to implement it, but I have no enough knowledge and need more information. Thanks!
Ping: @lardissone @pentateu @remipassmoilesel
Hey Icebob,
See this option on nats/node client: https://github.com/nats-io/node-nats#queue-groups
And a hack I have done on moleculer: https://github.com/remipassmoilesel/moleculer/tree/nats-experiments/work-queue-samples
A basic example can be a clustered microservice to process images (e.g. ImageProcessor)
Twenty instances of ImageProcessor are available on a cluster. A consummer ask for processing an image, and send a message. Only one instance should receive this message in order to process the image only once.
It is a simple option in node/nat client but you need to specify it every time you call nodenat.subscribe() And unfortunately with your implementation you call subscribe() only once. So the hack use the constructor of NATS transport. I don't think it is a good idea.
Thanks @remipassmoilesel. With the current Moleculer protocol you can't use the NATS queueing.
I now checked the Bull and it is nice. I'm going to try to add it as only a module.
Potential schema:
module.exports = {
name: "worker",
mixins: [BullService(connOpts)],
// Subscribe to queues
queues: {
"image.resize"(data, job) {
// Resize image
// job.process(30);
return Promise.resolve("done");
}
},
methods: {
something() {
// Add a job to the queue
this.enqueue("image.resize", { filename: "", width: 640, height: 480 });
}
}
}
@icebob That a clear example of what I meant with queues support in Moleculer.
Bull looks promising, haven't used it yet.
I created an experimental service for Bull. I think it is working, but please try somebody, who can.
I didn't publish on NPM because it is a very basic implementation, so if you would like to test, please copy the source of service to your project. It is very simple.
Source of service: https://github.com/ice-services/moleculer-addons/blob/db-v5/packages/moleculer-queue-bull/src/index.js
"use strict";
let { ServiceBroker } = require("moleculer");
let BullService = require("../../index");
let broker = new ServiceBroker({ logger: console });
broker.createService({
mixins: [BullService()],
queues: {
"sample.task"(job) {
this.logger.info("New job received!", job.data);
return new this.Promise(resolve => {
setTimeout(() => resolve({
done: true,
id: job.data.id,
worker: process.pid
}), 500);
});
}
}
});
broker.start();
"use strict";
let { ServiceBroker } = require("moleculer");
let BullService = require("../../index");
let broker = new ServiceBroker({ logger: console });
broker.createService({
mixins: [BullService()],
started() {
let id = 1;
setInterval(() => {
this.logger.info("Add a new job. ID: ", id);
this.enqueue("sample.task", { id: id++, pid: process.pid });
}, 2000);
this.getQueue("sample.task").on("global:completed", (job, res) => {
this.logger.info("Job completed!. Result:", res);
});
}
});
broker.start();
Requirements:
npm install bullYou can configure Bull in mixins:
mixins: [BullService("redis://localhost:6379", queueOpts)],
More info: https://github.com/OptimalBits/bull/blob/master/REFERENCE.md#queue
I just published a new task queue mixin for Bee-Queue: https://github.com/ice-services/moleculer-addons/tree/master/packages/moleculer-bee-queue#readme
Tomorrow I'm going to release an other one for Bull
Has anyone considered implementing a task queue mixin for AMQP? I just think that since there is already an AMQP transporter it would make sense to be able to leverage the same infrastructure (MQ) for task queueing. @icebob what's your opinion on this?
+1 for AMQP
Most helpful comment
Has anyone considered implementing a task queue mixin for AMQP? I just think that since there is already an AMQP transporter it would make sense to be able to leverage the same infrastructure (MQ) for task queueing. @icebob what's your opinion on this?