Egg: [RFC] 订阅模型

Created on 27 Sep 2017  ·  22Comments  ·  Source: eggjs/egg

一个服务接受请求一般分为两种

  1. 常见的是请求响应模型,不管是 http 还是 tcp,接受到一个请求经过处理后马上响应,客户端会等待响应。
  2. 另一种是异步响应模型,一个请求发送过来立即响应后再处理,处理完成后根据情况是否返回回执。

这个 RFC 主要讨论第二种模型:订阅模型,讨论如何统一这种编码风格。

订阅模型常见的是消息中间件,如社区已经实现的 kafka,虽然还没有 consumer 的例子,我可以简单写一下在 egg 使用的方式

// app.js
module.exports = app => {
  const consumer = new Consumer({});
  consumer.on('message', function (message) {
    co(function() {
      const ctx = app.createAnonymousContext();
      yield ctx.service.kafka.consume(message);
    });
  });
};

实现起来会显得非常繁琐,如果变为真正的企业应用,事件包 co 还会有潜在风险,比如这里 co 没有 catch,而且是否需要返回回执?而且代码没处放都写在 app.js 非常不友好。

设计

订阅模型是一种模式,所以 Egg 会提供一个基类和特定写法。

  1. 提供 Subscription 基类,这个类继承 BaseContextClass。

    const Subscription = require('egg').Subscription;
    
  2. 所有的子类必须实现 subscribe 方法

    const Subscription = require('egg').Subscription;
    class MessageConsumer extends Subscription {
       async subscribe(message) {
          await this.ctx.service.kafka.consume(message);
       }
    }
    module.exports = MessageConsumer;
    
  3. 这种模式不会约定目录,但是建议实现方实现 loader,比如 kafka 插件自动加载 app/kafka/message_consumer.js 文件中的类。

  4. 实现放可以继承 Subscription 覆盖基类

Schedule

现在的 egg-schedule 也非常类似订阅模型,如果改造可以改为

// {app_root}/app/schedule/cleandb.js
const Subscription = require('egg').Subscription;

class CleanDB extends Subscription {
  static get schedule() {
    type: 'worker',
    cron: '0 0 3 * * *',
  }

  * subscribe() {
    yield this.ctx.service.db.cleandb();
  }
}

module.exports = CleanDB;
proposals

All 22 comments

哈,前几天刚想重构下 egg-schedule,写了一半

egg-schedule

  • app.schedule.use(CustomStrategy) 替换掉现在的 agent[SCHEDULE_HANDLER]
  • app.schedule.start(job / jobId),启动一个任务
  • app.schedule.stop(job / jobId),停止一个任务
  • app.messenger.on('schedule', function(job, stats) {}) 任务开始执行的消息

```js

class CleanDB extends Subscription {
static get schedule() {
type: 'worker',
cron: '0 0 3 * * *',
}

  • subscribe() {
    yield this.ctx.service.db.cleandb();
    }

  • afterEach() {}

  • after() {}
    }
    ``

定时任务用agenda不是更好么

定位不一样。agenda 需要依赖 mongodb,有需要的话,这个可以通过 egg-schedule 的扩展功能集成即可。

发自我的 iPhone

在 2017年9月27日,18:21,linxuanwei notifications@github.com 写道:

agenda

是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。

@ntfs32

还没有领会到这样设计的好处,特别对egg-schedule来说,因为没有服务端,不需要发送回执吧?

请问 egg-schedule 现在支持 stop 某个任务否?

cc @gxcsoccer ,强制 done 的设计问题也一并看看。

cc @gxcsoccer ,强制 done 的设计问题也一并看看。

@JacksonTian 的意思要业务主动确认下下消费成功,他举的例子如下业务主动 try catch 了,但是忘记 rethrow,消息就会被丢掉

consumer.subscribe(config.topic, '*', function* (msg) {
  console.log(`receive message, msgId: ${msg.msgId}, body: ${msg.body.toString()}`)
  try {
    // 业务处理抛异常
  } catch(err) {
    // 这里没有 retry,消息就丢了
  }
});

如果没抛异常就认为是成功了,如果抛异常了就失败了。

如果没抛异常就认为是成功了,如果抛异常了就失败了。

+1,我觉得可以 try catch 忘记 re-throw 也可以忘记调用 done,从逻辑上增加一个 done 没道理。

如果用户忘记调用 done,那是用户的问题,会造成消息堆积,使得他能够重新审视代码的问题。

如果默认 done,失败会很容易造成静默,消息就被标记为消费了(但实际上没有)。

前一种会把问题暴露出来,后一种,就。

这个文档写清楚就好了,一般都要给回执的,不然调度方有些还有可能重发。

可以参考rxjs

能否帮忙构思下egg rabbitmq 基于订阅模型如何编写?

@popomore 是的,egg-schedule 本身是单机的,集群的扩展 egg-schedule 就可以了。
是否可以给个集群拓展的example呢?或者思路呢?

@Quinton @brucecodezone https://eggjs.app/zh-cn/basics/schedule.html#%E6%89%A9%E5%B1%95%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1%E7%B1%BB%E5%9E%8B

@atian25 这个要依赖消息服务的调度吧?目前遇到的情况是,同一份代码跑了部署在多台服务上,并订阅redis 一个db的key失效事件,收到失效事件后,只要一台服务器的一个worker执行,然后根据最新的数据再向redis插入一个key并设置失效事件,如此循环。目前的方式是另写一个项目单机起了一个服务来处理。

@brucecodezone 集群情况,肯定是必须依赖外部服务的调度的

@atian25 thanks

Was this page helpful?
0 / 5 - 0 ratings

Related issues

killagu picture killagu  ·  48Comments

popomore picture popomore  ·  53Comments

popomore picture popomore  ·  47Comments

itsky365 picture itsky365  ·  62Comments

musicode picture musicode  ·  55Comments