需求是想要在agent加载之前从数据库加载数据缓存到内存或redis,然后再启动agent,连接kafka,处理内存数据。
请问如何实现?
有 beforeStart
但真的一定要在 agent 启动前做么?而不是启动完后,做了这些事,然后再触发后续动作
@atian25
app.js beforeStart 加载缓存,
在agent.js中
module.exports = agent => {
agent.messenger.on('egg-ready', () => {
const consumer = agent.kafka.comsumer(consumerOptions);
consumer.start(function(err) {
if (err) {
console.log('Error occurred when starting consumer. err:', err);
} else {
console.log('Started consumer successfully');
}
});
consumer.on('message', (msg, metadata) => {
const string = msg.toString();
console.log('收到kafka消息:', string)
const action = 'xx_action';
agent.messenger.sendRandom(action, string);
});
consumer.on('error', function(err) {
console.log('On error. err:', err);
});
});
};
其实就是用agent来接收kafka消息,交给worker们处理业务。
这样处理不知有没有问题?
看之前的[issue]: https://github.com/eggjs/egg/issues/1925#issuecomment-355519685
有一些疑虑。
那边提到的 横向扩展 指的是:
你这种场景下,我觉得可以是所有的 worker 都缓存好后(或者在数据量的不大情况下,agent 来缓存然后发给 worker),再发一个事件通知 agent,再通知 kafka 那边该实例已经准备好,可以接受任务了。
多台机器消费同一个组下的topic,kafka在分发消息的时候对消费相同topic的机器随机发送一次就不会出现消息重复的问题了,所以横向扩展的问题利用kafka消息分发机制也可以解决了。
嗯,这里是 2 个问题了:
egg-ready 这个事件。嗯,我大概明白你的意思了
简单写了下思路
app.js
module.exports = app => {
console.log('====================app waiting start==================');
app.messenger.once('egg-ready', () => {
console.log('====================app egg-ready====================');
console.log('app sendToAgent data');
// sendToAgent之前,这里加载缓存
app.messenger.sendToAgent('agent-event', { foo: 'send to agent' });
console.log('app app-event data');
app.messenger.sendToApp('app-event', { foo: 'send to app' });
});
app.messenger.on('app-event', data => {
console.log('event app-event recive data===', data);
});
};
agent.js
module.exports = agent => {
agent.messenger.once('egg-ready', () => {
console.log('====================agent egg-ready====================');
})
// app-event 告诉agent缓存加载完毕,可以消费kafka消息了
agent.messenger.on('agent-event', data => {
console.log('event agent-event recive data===', data);
})
};
打印
➜ open-webhook (master) ✗ npm run dev
> [email protected] dev /Users/dang/projects/open-webhook
> egg-bin dev --port=7005
2018-01-29 14:36:56,675 INFO 51689 [master] node version v8.9.3
2018-01-29 14:36:56,677 INFO 51689 [master] egg version 2.3.0
Db.prototype.authenticate method will no longer be available in the next major release 3.x as MongoDB 3.6 will only allow auth against users in the admin db and will no longer allow multiple credentials on a socket. Please authenticate using MongoClient.connect with auth credentials.
2018-01-29 14:36:57,395 INFO 51689 [master] agent_worker#1:51690 started (716ms)
====================app waiting start==================
====================router.js start==================
Db.prototype.authenticate method will no longer be available in the next major release 3.x as MongoDB 3.6 will only allow auth against users in the admin db and will no longer allow multiple credentials on a socket. Please authenticate using MongoClient.connect with auth credentials.
2018-01-29 14:36:58,228 INFO 51689 [master] egg started on http://127.0.0.1:7005 (1551ms)
====================app egg-ready====================
app sendToAgent data
app app-event data
event app-event recive data=== { foo: 'send to app' }
====================agent egg-ready====================
event agent-event recive data=== { foo: 'send to agent' }
大致是这样,不过要注意 worker 是多个的,还有存在 worker 重启的情况。
还有 event 名是可以自定义的,不一定要 agent-event.
schedule 扩展看 https://github.com/eggjs/egg-schedule#schedule-type
非常感谢 @atian25
agent.messenger.sendRandom(action, data)应该不会有太大问题吧;不会,整个过程其实就是:
嗯,明白。
考虑到以后数据量非常大的时候可能得换个方案了,比如多进程研发模式增强 https://eggjs.org/zh-cn/advanced/cluster-client.html
另外一个消耗瓶颈在消费kafka消息后用async/await同步存库这块,采用异步存库可能会更好吧,不知道会不出现其它问题
@atian25 在性能方面有何建议?
问下,在agent中能否处理公共的,比较耗时的操作呢,例如:提供公共导出pdf、doc或excel文件服务到 agent进程中,其他 worker 发送请求时 将 导出任务添加到 agent的导出任务队列中,我现在有这样的需求,不知道会不会有问题
问下,在agent中能否处理公共的,比较耗时的操作呢,例如:提供公共导出pdf、doc或excel文件服务到 agent进程中,其他 worker 发送请求时 将 导出任务添加到 agent的导出任务队列中,我现在有这样的需求,不知道会不会有问题
这些建议放到阿里云函数计算来做
Most helpful comment
不会,整个过程其实就是: