一个服务接受请求一般分为两种
这个 RFC 主要讨论第二种模型:订阅模型,讨论如何统一这种编码风格。
订阅模型常见的是消息中间件,如社区已经实现的 kafka,虽然还没有 consumer 的例子,我可以简单写一下在 egg 使用的方式
// app.js
module.exports = app => {
const consumer = new Consumer({});
consumer.on('message', function (message) {
co(function() {
const ctx = app.createAnonymousContext();
yield ctx.service.kafka.consume(message);
});
});
};
实现起来会显得非常繁琐,如果变为真正的企业应用,事件包 co 还会有潜在风险,比如这里 co 没有 catch,而且是否需要返回回执?而且代码没处放都写在 app.js 非常不友好。
订阅模型是一种模式,所以 Egg 会提供一个基类和特定写法。
提供 Subscription 基类,这个类继承 BaseContextClass。
const Subscription = require('egg').Subscription;
所有的子类必须实现 subscribe 方法
const Subscription = require('egg').Subscription;
class MessageConsumer extends Subscription {
async subscribe(message) {
await this.ctx.service.kafka.consume(message);
}
}
module.exports = MessageConsumer;
这种模式不会约定目录,但是建议实现方实现 loader,比如 kafka 插件自动加载 app/kafka/message_consumer.js 文件中的类。
实现放可以继承 Subscription 覆盖基类
现在的 egg-schedule 也非常类似订阅模型,如果改造可以改为
// {app_root}/app/schedule/cleandb.js
const Subscription = require('egg').Subscription;
class CleanDB extends Subscription {
static get schedule() {
type: 'worker',
cron: '0 0 3 * * *',
}
* subscribe() {
yield this.ctx.service.db.cleandb();
}
}
module.exports = CleanDB;
哈,前几天刚想重构下 egg-schedule,写了一半
app.schedule.use(CustomStrategy) 替换掉现在的 agent[SCHEDULE_HANDLER]app.schedule.start(job / jobId),启动一个任务app.schedule.stop(job / jobId),停止一个任务app.messenger.on('schedule', function(job, stats) {}) 任务开始执行的消息```js
class CleanDB extends Subscription {
static get schedule() {
type: 'worker',
cron: '0 0 3 * * *',
}
subscribe() {
yield this.ctx.service.db.cleandb();
}
afterEach() {}
定时任务用agenda不是更好么
定位不一样。agenda 需要依赖 mongodb,有需要的话,这个可以通过 egg-schedule 的扩展功能集成即可。
发自我的 iPhone
在 2017年9月27日,18:21,linxuanwei notifications@github.com 写道:
agenda
是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。
@ntfs32
还没有领会到这样设计的好处,特别对egg-schedule来说,因为没有服务端,不需要发送回执吧?
请问 egg-schedule 现在支持 stop 某个任务否?
cc @gxcsoccer ,强制 done 的设计问题也一并看看。
cc @gxcsoccer ,强制 done 的设计问题也一并看看。
@JacksonTian 的意思要业务主动确认下下消费成功,他举的例子如下业务主动 try catch 了,但是忘记 rethrow,消息就会被丢掉
consumer.subscribe(config.topic, '*', function* (msg) {
console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)
try {
// 业务处理抛异常
} catch(err) {
// 这里没有 retry,消息就丢了
}
});
如果没抛异常就认为是成功了,如果抛异常了就失败了。
如果没抛异常就认为是成功了,如果抛异常了就失败了。
+1,我觉得可以 try catch 忘记 re-throw 也可以忘记调用 done,从逻辑上增加一个 done 没道理。
如果用户忘记调用 done,那是用户的问题,会造成消息堆积,使得他能够重新审视代码的问题。
如果默认 done,失败会很容易造成静默,消息就被标记为消费了(但实际上没有)。
前一种会把问题暴露出来,后一种,就。
这个文档写清楚就好了,一般都要给回执的,不然调度方有些还有可能重发。
可以参考rxjs
能否帮忙构思下egg rabbitmq 基于订阅模型如何编写?
@popomore 是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。
是否可以给个集群拓展的example呢?或者思路呢?
@Quinton @brucecodezone https://eggjs.app/zh-cn/basics/schedule.html#%E6%89%A9%E5%B1%95%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1%E7%B1%BB%E5%9E%8B
@atian25 这个要依赖消息服务的调度吧?目前遇到的情况是,同一份代码跑了部署在多台服务上,并订阅redis 一个db的key失效事件,收到失效事件后,只要一台服务器的一个worker执行,然后根据最新的数据再向redis插入一个key并设置失效事件,如此循环。目前的方式是另写一个项目单机起了一个服务来处理。
@brucecodezone 集群情况,肯定是必须依赖外部服务的调度的
@atian25 thanks