Egg: 如何在启动agent之前处理一些简单的业务?

Created on 26 Jan 2018  ·  12Comments  ·  Source: eggjs/egg

需求是想要在agent加载之前从数据库加载数据缓存到内存或redis,然后再启动agent,连接kafka,处理内存数据。
请问如何实现?

Most helpful comment

不会,整个过程其实就是:

  1. worker 加载缓存完成后,告诉 agent:「报告,士兵 A 备战完毕」。
  2. agent 等所有 worker 都搞完后,告诉 kafka,「报告,01 连备战完毕,随时等待召唤」。
  3. kafka 下发消息告诉 agent,「这次任务就安排给你们组了,你随便叫组内一个特种兵来做即可」。
  4. agent 告诉士兵 A:这次你独自出击。

All 12 comments

beforeStart

但真的一定要在 agent 启动前做么?而不是启动完后,做了这些事,然后再触发后续动作

@atian25
app.js beforeStart 加载缓存,
在agent.js中

module.exports = agent => {
    agent.messenger.on('egg-ready', () => {
      const consumer = agent.kafka.comsumer(consumerOptions);
      consumer.start(function(err) {
        if (err) {
          console.log('Error occurred when starting consumer. err:', err);
        } else {
          console.log('Started consumer successfully');
        }
      });
      consumer.on('message', (msg, metadata) => {
        const string = msg.toString();
        console.log('收到kafka消息:', string)
        const action = 'xx_action';
        agent.messenger.sendRandom(action, string);
      });

      consumer.on('error', function(err) {
        console.log('On error. err:', err);
      });

    });
};

其实就是用agent来接收kafka消息,交给worker们处理业务。
这样处理不知有没有问题?
看之前的[issue]: https://github.com/eggjs/egg/issues/1925#issuecomment-355519685
有一些疑虑。

那边提到的 横向扩展 指的是:

  • 如果你把调度逻辑放到 agent 里面来做,那你集群多台机器的时候,就出问题了。
  • 我看你描述的场景,有点类似于集群类型的任务调度吧,这个没问题的。我们的 egg-schedule 有预留了扩展接口,你可以封装下,我们内部也有类似的实现,简单的说,就是:

    • 集群的所有 worker 都做为消费者,登记到外部的调度系统,并注册到消息中间件。

    • egg-schedule 扩展增加一个 cluster 模式

    • 外部调度系统会选择一个消费者,然后通过消息中间件的消息通知,然后随机选择一个 worker 来执行。

你这种场景下,我觉得可以是所有的 worker 都缓存好后(或者在数据量的不大情况下,agent 来缓存然后发给 worker),再发一个事件通知 agent,再通知 kafka 那边该实例已经准备好,可以接受任务了。

多台机器消费同一个组下的topic,kafka在分发消息的时候对消费相同topic的机器随机发送一次就不会出现消息重复的问题了,所以横向扩展的问题利用kafka消息分发机制也可以解决了。

嗯,这里是 2 个问题了:

  • 横向扩展的那个,配合 kafka 和 egg-schedule 的 cluster 扩展就搞定了。
  • 你这个 issue 的,我的建议是,在缓存完后,再通知准备完毕,可以开始消费 kafka 消息,而不是双方用 egg-ready 这个事件。

嗯,我大概明白你的意思了
简单写了下思路
app.js

module.exports = app => {
  console.log('====================app waiting start==================');
  app.messenger.once('egg-ready', () => {
    console.log('====================app egg-ready====================');
    console.log('app sendToAgent data');
    // sendToAgent之前,这里加载缓存
    app.messenger.sendToAgent('agent-event', { foo: 'send to agent' });
    console.log('app app-event data');
    app.messenger.sendToApp('app-event', { foo: 'send to app' });
  });

  app.messenger.on('app-event', data => {
    console.log('event app-event recive data===', data);
  });
};

agent.js

module.exports = agent => {
  agent.messenger.once('egg-ready', () => {
    console.log('====================agent egg-ready====================');
  })
  // app-event  告诉agent缓存加载完毕,可以消费kafka消息了
  agent.messenger.on('agent-event', data => {
    console.log('event agent-event recive data===', data);
  })
};

打印

➜  open-webhook (master) ✗ npm run dev

> [email protected] dev /Users/dang/projects/open-webhook
> egg-bin dev --port=7005

2018-01-29 14:36:56,675 INFO 51689 [master] node version v8.9.3
2018-01-29 14:36:56,677 INFO 51689 [master] egg version 2.3.0
Db.prototype.authenticate method will no longer be available in the next major release 3.x as MongoDB 3.6 will only allow auth against users in the admin db and will no longer allow multiple credentials on a socket. Please authenticate using MongoClient.connect with auth credentials.
2018-01-29 14:36:57,395 INFO 51689 [master] agent_worker#1:51690 started (716ms)
====================app waiting start==================
====================router.js start==================
Db.prototype.authenticate method will no longer be available in the next major release 3.x as MongoDB 3.6 will only allow auth against users in the admin db and will no longer allow multiple credentials on a socket. Please authenticate using MongoClient.connect with auth credentials.
2018-01-29 14:36:58,228 INFO 51689 [master] egg started on http://127.0.0.1:7005 (1551ms)
====================app egg-ready====================
app sendToAgent data
app app-event data
event app-event recive data=== { foo: 'send to app' }
====================agent egg-ready====================
event agent-event recive data=== { foo: 'send to agent' }

大致是这样,不过要注意 worker 是多个的,还有存在 worker 重启的情况。
还有 event 名是可以自定义的,不一定要 agent-event.
schedule 扩展看 https://github.com/eggjs/egg-schedule#schedule-type

非常感谢 @atian25

  1. 关于worker 我这边采用agent.messenger.sendRandom(action, data)应该不会有太大问题吧;
  2. 关于schedule 扩展 cluster 这个现在还没开始做,有问题我再另开一个issue;

不会,整个过程其实就是:

  1. worker 加载缓存完成后,告诉 agent:「报告,士兵 A 备战完毕」。
  2. agent 等所有 worker 都搞完后,告诉 kafka,「报告,01 连备战完毕,随时等待召唤」。
  3. kafka 下发消息告诉 agent,「这次任务就安排给你们组了,你随便叫组内一个特种兵来做即可」。
  4. agent 告诉士兵 A:这次你独自出击。

嗯,明白。
考虑到以后数据量非常大的时候可能得换个方案了,比如多进程研发模式增强 https://eggjs.org/zh-cn/advanced/cluster-client.html
另外一个消耗瓶颈在消费kafka消息后用async/await同步存库这块,采用异步存库可能会更好吧,不知道会不出现其它问题
@atian25 在性能方面有何建议?

问下,在agent中能否处理公共的,比较耗时的操作呢,例如:提供公共导出pdf、doc或excel文件服务到 agent进程中,其他 worker 发送请求时 将 导出任务添加到 agent的导出任务队列中,我现在有这样的需求,不知道会不会有问题

问下,在agent中能否处理公共的,比较耗时的操作呢,例如:提供公共导出pdf、doc或excel文件服务到 agent进程中,其他 worker 发送请求时 将 导出任务添加到 agent的导出任务队列中,我现在有这样的需求,不知道会不会有问题

这些建议放到阿里云函数计算来做

Was this page helpful?
0 / 5 - 0 ratings

Related issues

lvgg3271 picture lvgg3271  ·  3Comments

yuu2lee4 picture yuu2lee4  ·  3Comments

dizhifeng picture dizhifeng  ·  3Comments

Webjiacheng picture Webjiacheng  ·  3Comments

bupafengyu picture bupafengyu  ·  3Comments