// DataClient
const Base = require('sdk-base');
class DataClient extends Base {
constructor(options) {
super(options);
this.ready(true);
}
subscribe(info, listener) {
// 向服务端 subscribe
}
publish(info) {
// 向服务端 publish
}
* getData(id) {}
}
cluster-client 模块保持不变,通过 cluster(DataClient).create() 生成支持多进程的 ClusterClient
const cluster = require('cluster-client');
const clusterClient = cluster(DataClient).create();
例如增加带缓存的 get 同步方法:
const cluster = require('cluster-client');
const DataClient = require('./data_client');
class APIClient extends Base {
constructor(options) {
super(options);
this._client = (options.cluster || cluster)(DataClient).create(options);
this._client.ready(() => this.ready(true));
this._cache = {};
// config.subMap:
// {
// foo: reg1,
// bar: reg2,
// }
for (const key in config.subMap) {
this.subscribe(onfig.subMap[key], value => {
this._cache[key] = value;
});
}
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
module.exports = APIClient;
// app.js || agent.js
const APIClient = require('client-module'); // 上面那个模块
module.exports = app => {
const config = app.config.client;
app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(this) });
app.beforeStart(function* () {
yield app.client.ready();
});
};
附:
|------------------------------------------------|
| APIClient |
| |----------------------------------------|
| | clusterClient |
| | |---------------------------------|
| | | DataClient |
|-------|------|---------------------------------|
cluster-client 模块给我们提供了强大的功能,在 sub/pub 模式下使得我们能够只开发一个客户端,在不同的进程中运行同样的 API,且复用同一个远端连接。
一个最简单的例子:
const Base = require('sdk-base');
const cluster = require('cluster-client');
class RealClient extends Base {
constructor(options) {
super(options);
this.ready(true);
}
subscribe(info, listener) {
// 向服务端 subscribe
}
publish(info) {
// 向服务端 publish
}
* getData(id) {}
}
const client = cluster(RealClient).create({});
这段代码运行于多个进程,我们在每个进程都可以使用:
client.subscribe()
client.publish()
brilliant!
但 ClusterClient 同时带来了一些约束,如果想在各进程暴露同样的方法,那么只能是 subscribe publish 和 generator 函数。
假设我要实现一个同步的 get 方法,sub 过的数据直接放入内存,使用 get 方法时直接返回。要怎么实现呢?
cluster-client 提供了 override 方法,你可能会想:
const client = cluster(RealClient)
.overide('get', function() {})
.create({});
这时你会发现不可行,因为还没有 create 之前,我们无法拿到实例进行提前 subscribe,进行缓存。
所以这里建议一种模式,再包一个 API class:
class APIClient extends Base {
constructor(options) {
super(options);
this._client = cluster(RealClient).create({});
this._client.ready(() => this.ready(true));
this._cache = {};
// config.subMap:
// {
// foo: reg1,
// bar: reg2,
// }
for (const key in config.subMap) {
this.subscribe(onfig.subMap[key], value => {
this._cache[key] = value;
});
}
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
这样我们就可以:
client.get('foo')
当然这个例子太简单,看起来收益并不大。但是有些功能不仅仅是 sub 到数据就可以了,还需要做出复杂的处理,这种方式就变得有意义了。
好处还有提供一种统一的拓展模式,否则只要是同步的 API,开发者都需要实现一种 patch 的方式,写法太灵活了。
总结一下:
|------------------------------------------------|
| SDKClient |
| |----------------------------------------|
| | ClusterClient |
| | |---------------------------------|
| | | RealClient |
|-------|------|---------------------------------|
这种增强可以进一步固化到 cluster-client 中。
const client = cluster(RealClient, SDKClient)
通过这种方式一键返回最后需要使用的客户端。当然,对于只需要 pub/sub 的简单客户端,直接 cluster(RealClient) 即可,和原来是一样的。
最后这个 SDKClient 能给个具体的例子吗?
const client = cluster(RealClient, SDKClient)
没看懂为啥不能 delegate 普通函数?
是因为在调用 sub 时如果是 follower 直接走了 cluster 的逻辑而没走 realclient 的逻辑么?
这里就是我之前提出的问题,cluster-client 在这里切的太早了,我认为应该是由 realclient 自行转调。
RealClient extends ClusetClient,如果是 follower 在 sub 时还是调用 RealClient 的 sub,在这个函数里再去调用 ClusetClient 的 sub 去连 leader。
没看懂为啥不能 delegate 普通函数?
如果是同步的 API,根本没法代理的,因为我们是通过 tcp 连接做 invoke,所以势必只能是异步的 API 才能 work。
@popomore
最后这个 SDKClient 能给个具体的例子吗?
上面那个 SDKClient 的 class 就是例子。当然还可以进一步精简,例如提供一个基类,转调 subscribe 和 publish 可以自动完成。
@gxcsoccer
所以归根到底的问题还是 follower 做 sub 没法缓存数据,get 就取不到了
Shawn notifications@github.com于2017年2月6日 周一21:53写道:
没看懂为啥不能 delegate 普通函数?
如果是同步的 API,根本没法代理的,因为我们是通过 tcp 连接做 invoke,所以势必只能是异步的 API 才能 work。
@popomore https://github.com/popomore
—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/eggjs/egg/issues/322#issuecomment-277687037, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAWA1ZtxvVg247bOAeMyHvHApq5Geuvuks5rZyXkgaJpZM4L4EqW
.
是的
不仅仅有函数的问题,还有一些需求,我们想提供事件、以及某些public 属性,只通过现在的 ClusterClient 还是挺难做到的。
感觉我说的方式才能真正解决问题
Shawn notifications@github.com于2017年2月6日 周一21:57写道:
不仅仅有函数的问题,还有一些需求,我们想提供事件、以及某些public 属性,只通过现在的 ClusterClient 还是挺难做到的。
—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/eggjs/egg/issues/322#issuecomment-277688031, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAWA1cW_DJQm_FBeX32MAGG61BJnjkfJks5rZya5gaJpZM4L4EqW
.
这样另外一个好处是,agent 和 app worker 中 client 的 API 真正达到了完全的一致,因为访问的都是 SDKClient。
内部的 antvip 实际上用的是这个模式做到 API 完全抹平(主要是这个模块的同步 API 计算逻辑太多,不是简单在两个进程里都搞一些就搞的定的),当时因为没有广泛使用的概念抽象,理解起来反而复杂了。所以想试试结合现在的 cluster-client,如果能推广开就更好了。
@popomore 你说的那个方式其实和我的比较类似,你是想暴露的是 realclient 给用户使用,内部转调 clusterClient,或者 extend。但是这样有个问题,realclient 里的实现不好做,需要考虑当前跑在哪里,sub 和 pub 这块以及 invoke 的函数设计都不好做了。或者你可以举个例子,写写伪代码看看。
@shaoshuai0102 你的这个例子,ClusterClient 貌似不需要修改什么,就是外边再做一次封装
class APIClient extends Base {
constructor(options) {
super(options);
this._client = cluster(RealClient).create({});
this._client.ready(() => this.ready(true));
this._cache = {};
// config.subMap:
// {
// foo: reg1,
// bar: reg2,
// }
for (const key in config.subMap) {
this.subscribe(onfig.subMap[key], value => {
this._cache[key] = value;
});
}
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
是的。cluster-client 主体不修改的,但定位变了,本来是用来解决全部问题,包括抹平 API,现在变成只解决数据同步方面的 API。最终的用的是 APIClient(内部转调 ClusterClient)。整体方案可以内置到 cluster-client 模块中,以 cluster(RealClient, APIClient) 的方式来提供服务。
是不是可以约定 cluster-client 封装的都是纯接口的 client (不带业务逻辑,不包含缓存等等),然后外边再对其进行业务封装
是的。也可以说只做数据操作。
举个简单的例子,比如 configclient,以前我们会对订阅到的数据进行处理后再吐给业务,现在我们就将它拆分成两部分,一部分就是纯粹的订阅、发布 api 实现,这部分可以很容易的被 cluster-client 封装,另外一份就是对它的进一步封装
是的是的
有点 meteorjs 同步数据的感觉? worker 是前端调用,agent 是后端 rpc 代理 。
我的想法是 cluster 基类的 API 和 net 的一致,比如调用 sub 时 cluster 来判断是否调用远程 server 还是自己实现。如果难实现可以开放一个函数让客户端来实现自己的 sub
@popomore 写一下例子解释下你的想法吧 没 get 到。
明天写个,不在电脑旁
实例代码 @shaoshuai0102 @gxcsoccer
const { BaseClient, SUBSCRIBE } = require('cluster-client');
class Client extends BaseClient {
get() {
// 获取缓存
}
subscribe(...args) {
// 做些 hack,缓存订阅代码
// 父类判断是否为 leader
// leader 调用 client 端的实现
// follower 转调 leader 的客户端的实现。
super.subscribe(...args);
}
// 实现本客户端的 sub
// 这里我不是很清楚能否抽象,最差就是客户端自己实现
[SUBSCRIBE]() {
}
}
举个简单的例子,比如 configclient,以前我们会对订阅到的数据进行处理后再吐给业务,现在我们就将它拆分成两部分,一部分就是纯粹的订阅、发布 api 实现,这部分可以很容易的被 cluster-client 封装,另外一份就是对它的进一步封装
@gxcsoccer @shaoshuai0102 如果是这样的话,我觉得这个功能不应该由 configclient 来实现,应该有一个基于 configclient 的 client 来实现,它不需要 cluster-client 模式,在每个 worker 都初始化一个实例即可。
drm 为例,分成2个 client:drm-raw-client, drm-client,drm-raw-client 走 cluster-client 实现 sub,然后 drm-client 包装 drm-raw-client,实现 sub 和 get。
哦,看来就是 @shaoshuai0102 正文描述的方案。。。我多余了。
我上线写的思路和 @shaoshuai0102 差不多,只是不再需要 clusterclient 来启动了,只需要在基类的构造函数初始化就可以了。
@guangao 提的方案是打算把 APIClient 和 RealClient 合并成一个 Client,这个 Client 既做包含缓存的 API,也包含实际的网络交互(ClusterClient 的一些细节)。对于 egg 来说应该说是更方便的,可以写一个客户端就搞定了。(sub 的抽象细节有些不确定到底能不能这样做到)
但是一旦这样实现后,如果是非 egg 体系的场景就不能用这个客户端了。如果是我说的那个方案,RealClient + APIClient 组合仍然可以拿出去在非 egg 场景使用。从这个角度看,应该更灵活些。
不知道非 egg 体系是指哪些,我们现在基本是强约定,不建立在一定知识基础上的话是很难理解这个模块的。
如果不走 agent 模式,可以让客户端都是 leader,并且关闭 server,这样客户端调用的方法就是自己实现的。
这个本身也是为多进程模型设计的,不完全依赖于 egg
@shaoshuai0102 更新下结论吧?
在写
讨论结论:
// DataClient
const Base = require('sdk-base');
class DataClient extends Base {
constructor(options) {
super(options);
this.ready(true);
}
subscribe(info, listener) {
// 向服务端 subscribe
}
publish(info) {
// 向服务端 publish
}
* getData(id) {}
}
cluster-client 模块保持不变,通过 cluster(DataClient).create() 生成支持多进程的 ClusterClient
const cluster = require('cluster-client');
const clusterClient = cluster(DataClient).create();
例如增加带缓存的 get 同步方法:
const cluster = require('cluster-client');
const DataClient = require('./data_client');
class APIClient extends Base {
constructor(options) {
super(options);
this._client = (options.cluster || cluster)(DataClient).create(options);
this._client.ready(() => this.ready(true));
this._cache = {};
// config.subMap:
// {
// foo: reg1,
// bar: reg2,
// }
for (const key in config.subMap) {
this.subscribe(onfig.subMap[key], value => {
this._cache[key] = value;
});
}
}
subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}
publish(reg) {
this._client.publish(reg);
}
get(key) {
return this._cache[key];
}
}
module.exports = APIClient;
// app.js || agent.js
const Client = require('client-module'); // 上面那个模块
module.exports = app => {
const config = app.config.client;
app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(this) });
app.beforeStart(function* () {
yield app.client.ready();
});
};
附:
|------------------------------------------------|
| SDKClient |
| |----------------------------------------|
| | clusterClient |
| | |---------------------------------|
| | | DataClient |
|-------|------|---------------------------------|
下面这样是不是简化一点:
// app.js || agent.js
const APIClient = require('client-module');
module.exports = app => {
const config = app.config.client;
app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster });
};
const cluster = require('cluster-client');
const DataClient = require('./data_client');
class APIClient extends Base {
constructor(options) {
super(options);
this._client = (options.cluster || cluster).create(DataClient).create(options);
// ...
}
不过这样写还有点不对的地方,就是要看 app.cluster 是否可以封装一下了。。不然就只能按 @shaoshuai0102 的形式。
做一下 bind 就可以了吧
app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(app) });
反正意思差不多,最好是插件只需要传递一个 app.cluster 给 APIClient,APIClient 用这个 app.cluster 替换掉 ClusterClient。
我改一下
根据 @dead-horse 的建议做了简化,更新到上面的回复了
this._client = (options.cluster || cluster)(DataClient).create(options);
updated
@shaoshuai0102 可以更新到 https://eggjs.org/zh-cn/advanced/cluster.html 文档了
ZookeeperClient 就是一个开源的例子
仓库在那里 没找到
有时间我包一下
@fengmk2 zk 基于哪个你上面说的模块还是基于 https://github.com/dannycoates/zkjs ?
我看 zkjs 最后一个mr 是你的
@shaoshuai0102 请问"由于 clusterClient 的 API 已经被抹平了多进程差异"这句话中 “被抹平了多进程差异”具体指的什么?
@shaoshuai0102 请问"由于 clusterClient 的 API 已经被抹平了多进程差异"这句话中 “被抹平了多进程差异”具有指的什么?
@gxcsoccer 可以帮解释下
@miser
@shaoshuai0102 这句话的意应该就是说 clusterclient 帮你封装了进程间交互的问题,让你感知不到多进程模式的存在
我写了几次,都没法测出多进程。
process.pid打印出来,都是agent.js下的进程ID。
definition worker
app/worker/worker1.js
worker1.js
class Work1 extends eggWorker{
async dosomething1(){
console.log(`${process.pid } this is process work 1`)
}
async dosomething2(params){
//dosomething2
return xxxx
}
}
use
await this.app.worker.worker1.dosomething1();
const result = this.app.worker.worker1.dosomething2(xxxx);
这是我觉得,用户最方便使用多进程的初级版本。
更好点,要在 dosomething1内,还能通过this.service.xxx.xxx来调用service类定义的方法。
可以考虑在框架层面抹平多进程差异,比如在Service层面抹平多进程差异,所有Service都在一个独立Worker中运行
Most helpful comment
讨论结论:
1. 开发 DataClient
2. 包装出 clusterClient
cluster-client 模块保持不变,通过
cluster(DataClient).create()生成支持多进程的 ClusterClient3. 开发 APIClient
例如增加带缓存的 get 同步方法:
4. 模块向外暴露 API
5. plugin 开发
附: