Egg: 增强多进程研发模式

Created on 6 Feb 2017  ·  52Comments  ·  Source: eggjs/egg

讨论结论

1. 开发 DataClient

  • 模块开发者开发 DataClient(原来的 RealClient),只做异步 API,负责和远端服务通讯。
  • 一般只有 publish subscribe 和 generator/promise API
  • 开发这个客户端时无需了解多进程知识,只和服务端交互
// DataClient
const Base = require('sdk-base');

class DataClient extends Base {
  constructor(options) {
    super(options);
    this.ready(true);
  }

  subscribe(info, listener) {
    // 向服务端 subscribe
  }

  publish(info) {
    // 向服务端 publish
  }

  * getData(id) {}
}

2. 包装出 clusterClient

cluster-client 模块保持不变,通过 cluster(DataClient).create() 生成支持多进程的 ClusterClient

const cluster = require('cluster-client');
const clusterClient = cluster(DataClient).create();

3. 开发 APIClient

  • 对于有数据缓存等同步 API 需求的模块,再额外封装一个 APIClient
  • 用户使用到的是这个 Client 的实例
  • 异步数据获取,通过调用 ClusterClient 的 API 实现。
  • 由于 clusterClient 的 API 已经被抹平了多进程差异,所以在开发 APIClient 时,也无需关心多进程知识。

例如增加带缓存的 get 同步方法:

const cluster = require('cluster-client');
const DataClient = require('./data_client');

class APIClient extends Base {
  constructor(options) {
    super(options);

    this._client = (options.cluster || cluster)(DataClient).create(options);

    this._client.ready(() => this.ready(true));

    this._cache = {};

    // config.subMap:
    // {
    //   foo: reg1,
    //   bar: reg2,
    // }
    for (const key in config.subMap) {
      this.subscribe(onfig.subMap[key], value => {
        this._cache[key] = value;
      });
    }
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }

  get(key) {
    return this._cache[key];
  }
}

4. 模块向外暴露 API

module.exports = APIClient;

5. plugin 开发

// app.js || agent.js
const APIClient = require('client-module'); // 上面那个模块
module.exports = app => {
  const config = app.config.client;
  app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(this) });
  app.beforeStart(function* () {
    yield app.client.ready();
  });
};

附:

|------------------------------------------------|
| APIClient                                      |
|       |----------------------------------------|
|       | clusterClient                          |
|       |      |---------------------------------|
|       |      | DataClient                      |
|-------|------|---------------------------------|

issue 原始内容如下

cluster-client 模块给我们提供了强大的功能,在 sub/pub 模式下使得我们能够只开发一个客户端,在不同的进程中运行同样的 API,且复用同一个远端连接。

一个最简单的例子:

const Base = require('sdk-base');
const cluster = require('cluster-client');

class RealClient extends Base {
  constructor(options) {
    super(options);
    this.ready(true);
  }

  subscribe(info, listener) {
    // 向服务端 subscribe
  }

  publish(info) {
    // 向服务端 publish
  }

  * getData(id) {}
}

const client = cluster(RealClient).create({});

这段代码运行于多个进程,我们在每个进程都可以使用:

client.subscribe()
client.publish()

brilliant!

但 ClusterClient 同时带来了一些约束,如果想在各进程暴露同样的方法,那么只能是 subscribe publish 和 generator 函数。

假设我要实现一个同步的 get 方法,sub 过的数据直接放入内存,使用 get 方法时直接返回。要怎么实现呢?

cluster-client 提供了 override 方法,你可能会想:

const client = cluster(RealClient)
                       .overide('get', function() {})
                       .create({});

这时你会发现不可行,因为还没有 create 之前,我们无法拿到实例进行提前 subscribe,进行缓存。

所以这里建议一种模式,再包一个 API class:

  • 异步方法可以直接转调 clusterClient 实例
  • 同步方法直接实现
class APIClient extends Base {
  constructor(options) {
    super(options);

    this._client = cluster(RealClient).create({});
    this._client.ready(() => this.ready(true));

    this._cache = {};

    // config.subMap:
    // {
    //   foo: reg1,
    //   bar: reg2,
    // }
    for (const key in config.subMap) {
      this.subscribe(onfig.subMap[key], value => {
        this._cache[key] = value;
      });
    }
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }

  get(key) {
    return this._cache[key];
  }
}

这样我们就可以:

client.get('foo')

当然这个例子太简单,看起来收益并不大。但是有些功能不仅仅是 sub 到数据就可以了,还需要做出复杂的处理,这种方式就变得有意义了。

好处还有提供一种统一的拓展模式,否则只要是同步的 API,开发者都需要实现一种 patch 的方式,写法太灵活了。

总结一下:

|------------------------------------------------|
| SDKClient                                      |
|       |----------------------------------------|
|       | ClusterClient                          |
|       |      |---------------------------------|
|       |      | RealClient                      |
|-------|------|---------------------------------|

这种增强可以进一步固化到 cluster-client 中。

const client = cluster(RealClient, SDKClient)

通过这种方式一键返回最后需要使用的客户端。当然,对于只需要 pub/sub 的简单客户端,直接 cluster(RealClient) 即可,和原来是一样的。

agent feature

Most helpful comment

讨论结论:

1. 开发 DataClient

  • 模块开发者开发 DataClient(原来的 RealClient),只做异步 API,负责和远端服务通讯。
  • 一般只有 publish subscribe 和 generator/promise API
  • 开发这个客户端时无需了解多进程知识,只和服务端交互
// DataClient
const Base = require('sdk-base');

class DataClient extends Base {
  constructor(options) {
    super(options);
    this.ready(true);
  }

  subscribe(info, listener) {
    // 向服务端 subscribe
  }

  publish(info) {
    // 向服务端 publish
  }

  * getData(id) {}
}

2. 包装出 clusterClient

cluster-client 模块保持不变,通过 cluster(DataClient).create() 生成支持多进程的 ClusterClient

const cluster = require('cluster-client');
const clusterClient = cluster(DataClient).create();

3. 开发 APIClient

  • 对于有数据缓存等同步 API 需求的模块,再额外封装一个 APIClient
  • 用户使用到的是这个 Client 的实例
  • 异步数据获取,通过调用 ClusterClient 的 API 实现。
  • 由于 clusterClient 的 API 已经被抹平了多进程差异,所以在开发 APIClient 时,也无需关心多进程知识。

例如增加带缓存的 get 同步方法:

const cluster = require('cluster-client');
const DataClient = require('./data_client');

class APIClient extends Base {
  constructor(options) {
    super(options);

    this._client = (options.cluster || cluster)(DataClient).create(options);

    this._client.ready(() => this.ready(true));

    this._cache = {};

    // config.subMap:
    // {
    //   foo: reg1,
    //   bar: reg2,
    // }
    for (const key in config.subMap) {
      this.subscribe(onfig.subMap[key], value => {
        this._cache[key] = value;
      });
    }
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }

  get(key) {
    return this._cache[key];
  }
}

4. 模块向外暴露 API

module.exports = APIClient;

5. plugin 开发

// app.js || agent.js
const Client = require('client-module'); // 上面那个模块
module.exports = app => {
  const config = app.config.client;
  app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(this) });
  app.beforeStart(function* () {
    yield app.client.ready();
  });
};

附:

|------------------------------------------------|
| SDKClient                                      |
|       |----------------------------------------|
|       | clusterClient                          |
|       |      |---------------------------------|
|       |      | DataClient                      |
|-------|------|---------------------------------|

All 52 comments

最后这个 SDKClient 能给个具体的例子吗?

const client = cluster(RealClient, SDKClient)

没看懂为啥不能 delegate 普通函数?

是因为在调用 sub 时如果是 follower 直接走了 cluster 的逻辑而没走 realclient 的逻辑么?

这里就是我之前提出的问题,cluster-client 在这里切的太早了,我认为应该是由 realclient 自行转调。

RealClient extends ClusetClient,如果是 follower 在 sub 时还是调用 RealClient 的 sub,在这个函数里再去调用 ClusetClient 的 sub 去连 leader。

没看懂为啥不能 delegate 普通函数?

如果是同步的 API,根本没法代理的,因为我们是通过 tcp 连接做 invoke,所以势必只能是异步的 API 才能 work。

@popomore

最后这个 SDKClient 能给个具体的例子吗?

上面那个 SDKClient 的 class 就是例子。当然还可以进一步精简,例如提供一个基类,转调 subscribe 和 publish 可以自动完成。

@gxcsoccer

所以归根到底的问题还是 follower 做 sub 没法缓存数据,get 就取不到了
Shawn notifications@github.com于2017年2月6日 周一21:53写道:

没看懂为啥不能 delegate 普通函数?

如果是同步的 API,根本没法代理的,因为我们是通过 tcp 连接做 invoke,所以势必只能是异步的 API 才能 work。

@popomore https://github.com/popomore


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
https://github.com/eggjs/egg/issues/322#issuecomment-277687037, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAWA1ZtxvVg247bOAeMyHvHApq5Geuvuks5rZyXkgaJpZM4L4EqW
.

是的

不仅仅有函数的问题,还有一些需求,我们想提供事件、以及某些public 属性,只通过现在的 ClusterClient 还是挺难做到的。

感觉我说的方式才能真正解决问题
Shawn notifications@github.com于2017年2月6日 周一21:57写道:

不仅仅有函数的问题,还有一些需求,我们想提供事件、以及某些public 属性,只通过现在的 ClusterClient 还是挺难做到的。


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
https://github.com/eggjs/egg/issues/322#issuecomment-277688031, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAWA1cW_DJQm_FBeX32MAGG61BJnjkfJks5rZya5gaJpZM4L4EqW
.

这样另外一个好处是,agent 和 app worker 中 client 的 API 真正达到了完全的一致,因为访问的都是 SDKClient。

  • ClusterClient 提供了优雅的方式抹平数据访问 API
  • APIClient 抹平最终 API

内部的 antvip 实际上用的是这个模式做到 API 完全抹平(主要是这个模块的同步 API 计算逻辑太多,不是简单在两个进程里都搞一些就搞的定的),当时因为没有广泛使用的概念抽象,理解起来反而复杂了。所以想试试结合现在的 cluster-client,如果能推广开就更好了。

@popomore 你说的那个方式其实和我的比较类似,你是想暴露的是 realclient 给用户使用,内部转调 clusterClient,或者 extend。但是这样有个问题,realclient 里的实现不好做,需要考虑当前跑在哪里,sub 和 pub 这块以及 invoke 的函数设计都不好做了。或者你可以举个例子,写写伪代码看看。

@shaoshuai0102 你的这个例子,ClusterClient 貌似不需要修改什么,就是外边再做一次封装

class APIClient extends Base {
  constructor(options) {
    super(options);

    this._client = cluster(RealClient).create({});
    this._client.ready(() => this.ready(true));

    this._cache = {};

    // config.subMap:
    // {
    //   foo: reg1,
    //   bar: reg2,
    // }
    for (const key in config.subMap) {
      this.subscribe(onfig.subMap[key], value => {
        this._cache[key] = value;
      });
    }
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }

  get(key) {
    return this._cache[key];
  }
}

是的。cluster-client 主体不修改的,但定位变了,本来是用来解决全部问题,包括抹平 API,现在变成只解决数据同步方面的 API。最终的用的是 APIClient(内部转调 ClusterClient)。整体方案可以内置到 cluster-client 模块中,以 cluster(RealClient, APIClient) 的方式来提供服务。

是不是可以约定 cluster-client 封装的都是纯接口的 client (不带业务逻辑,不包含缓存等等),然后外边再对其进行业务封装

是的。也可以说只做数据操作。

举个简单的例子,比如 configclient,以前我们会对订阅到的数据进行处理后再吐给业务,现在我们就将它拆分成两部分,一部分就是纯粹的订阅、发布 api 实现,这部分可以很容易的被 cluster-client 封装,另外一份就是对它的进一步封装

是的是的

有点 meteorjs 同步数据的感觉? worker 是前端调用,agent 是后端 rpc 代理 。

我的想法是 cluster 基类的 API 和 net 的一致,比如调用 sub 时 cluster 来判断是否调用远程 server 还是自己实现。如果难实现可以开放一个函数让客户端来实现自己的 sub

@popomore 写一下例子解释下你的想法吧 没 get 到。

明天写个,不在电脑旁

实例代码 @shaoshuai0102 @gxcsoccer

const { BaseClient, SUBSCRIBE } = require('cluster-client');

class Client extends BaseClient {
  get() {
    // 获取缓存
  }

  subscribe(...args) {
    // 做些 hack,缓存订阅代码

    // 父类判断是否为 leader
    // leader 调用 client 端的实现
    // follower 转调 leader 的客户端的实现。
    super.subscribe(...args);
  }

  // 实现本客户端的 sub
  // 这里我不是很清楚能否抽象,最差就是客户端自己实现
  [SUBSCRIBE]() {
  }
}
  1. BaseClient 提供 sub,pub,invoke 等方法,并提供 symbol 指定客户端可以实现哪些方法。
  2. egg 不需要内置 cluster-client,使用 cluster-client 的插件才有用。我这里不确定 cluster-client 包的唯一性
  3. 如果是 leader,[Leader]Client.subscribe -> [Leader]BaseClient.subscribe -> [Leader]Client.SUBSCRIBE
  4. 如果是 follower,[Follower]Client.subscribe -> [Follower]BaseClient.subscribe -> [Leader]Client.SUBSCRIBE

举个简单的例子,比如 configclient,以前我们会对订阅到的数据进行处理后再吐给业务,现在我们就将它拆分成两部分,一部分就是纯粹的订阅、发布 api 实现,这部分可以很容易的被 cluster-client 封装,另外一份就是对它的进一步封装

@gxcsoccer @shaoshuai0102 如果是这样的话,我觉得这个功能不应该由 configclient 来实现,应该有一个基于 configclient 的 client 来实现,它不需要 cluster-client 模式,在每个 worker 都初始化一个实例即可。

drm 为例,分成2个 client:drm-raw-client, drm-client,drm-raw-client 走 cluster-client 实现 sub,然后 drm-client 包装 drm-raw-client,实现 sub 和 get。

哦,看来就是 @shaoshuai0102 正文描述的方案。。。我多余了。

我上线写的思路和 @shaoshuai0102 差不多,只是不再需要 clusterclient 来启动了,只需要在基类的构造函数初始化就可以了。

@guangao 提的方案是打算把 APIClient 和 RealClient 合并成一个 Client,这个 Client 既做包含缓存的 API,也包含实际的网络交互(ClusterClient 的一些细节)。对于 egg 来说应该说是更方便的,可以写一个客户端就搞定了。(sub 的抽象细节有些不确定到底能不能这样做到)

但是一旦这样实现后,如果是非 egg 体系的场景就不能用这个客户端了。如果是我说的那个方案,RealClient + APIClient 组合仍然可以拿出去在非 egg 场景使用。从这个角度看,应该更灵活些。

不知道非 egg 体系是指哪些,我们现在基本是强约定,不建立在一定知识基础上的话是很难理解这个模块的。

如果不走 agent 模式,可以让客户端都是 leader,并且关闭 server,这样客户端调用的方法就是自己实现的。

这个本身也是为多进程模型设计的,不完全依赖于 egg

@shaoshuai0102 更新下结论吧?

在写

讨论结论:

1. 开发 DataClient

  • 模块开发者开发 DataClient(原来的 RealClient),只做异步 API,负责和远端服务通讯。
  • 一般只有 publish subscribe 和 generator/promise API
  • 开发这个客户端时无需了解多进程知识,只和服务端交互
// DataClient
const Base = require('sdk-base');

class DataClient extends Base {
  constructor(options) {
    super(options);
    this.ready(true);
  }

  subscribe(info, listener) {
    // 向服务端 subscribe
  }

  publish(info) {
    // 向服务端 publish
  }

  * getData(id) {}
}

2. 包装出 clusterClient

cluster-client 模块保持不变,通过 cluster(DataClient).create() 生成支持多进程的 ClusterClient

const cluster = require('cluster-client');
const clusterClient = cluster(DataClient).create();

3. 开发 APIClient

  • 对于有数据缓存等同步 API 需求的模块,再额外封装一个 APIClient
  • 用户使用到的是这个 Client 的实例
  • 异步数据获取,通过调用 ClusterClient 的 API 实现。
  • 由于 clusterClient 的 API 已经被抹平了多进程差异,所以在开发 APIClient 时,也无需关心多进程知识。

例如增加带缓存的 get 同步方法:

const cluster = require('cluster-client');
const DataClient = require('./data_client');

class APIClient extends Base {
  constructor(options) {
    super(options);

    this._client = (options.cluster || cluster)(DataClient).create(options);

    this._client.ready(() => this.ready(true));

    this._cache = {};

    // config.subMap:
    // {
    //   foo: reg1,
    //   bar: reg2,
    // }
    for (const key in config.subMap) {
      this.subscribe(onfig.subMap[key], value => {
        this._cache[key] = value;
      });
    }
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }

  get(key) {
    return this._cache[key];
  }
}

4. 模块向外暴露 API

module.exports = APIClient;

5. plugin 开发

// app.js || agent.js
const Client = require('client-module'); // 上面那个模块
module.exports = app => {
  const config = app.config.client;
  app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(this) });
  app.beforeStart(function* () {
    yield app.client.ready();
  });
};

附:

|------------------------------------------------|
| SDKClient                                      |
|       |----------------------------------------|
|       | clusterClient                          |
|       |      |---------------------------------|
|       |      | DataClient                      |
|-------|------|---------------------------------|

下面这样是不是简化一点:

  • plugin 中
// app.js || agent.js
const APIClient = require('client-module');
module.exports = app => {
  const config = app.config.client;
  app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster });
};
  • APIClient
const cluster = require('cluster-client');
const DataClient = require('./data_client');

class APIClient extends Base {
  constructor(options) {
    super(options);
    this._client = (options.cluster || cluster).create(DataClient).create(options);
    // ...
  }

不过这样写还有点不对的地方,就是要看 app.cluster 是否可以封装一下了。。不然就只能按 @shaoshuai0102 的形式。

做一下 bind 就可以了吧

 app.client = new APIClient(Object.assign({}, config, { cluster: app.cluster.bind(app) });

反正意思差不多,最好是插件只需要传递一个 app.cluster 给 APIClient,APIClient 用这个 app.cluster 替换掉 ClusterClient

我改一下

根据 @dead-horse 的建议做了简化,更新到上面的回复了

this._client = (options.cluster || cluster)(DataClient).create(options);

updated

@shaoshuai0102 可以更新到 https://eggjs.org/zh-cn/advanced/cluster.html 文档了

ZookeeperClient 就是一个开源的例子

仓库在那里 没找到

有时间我包一下

@fengmk2 zk 基于哪个你上面说的模块还是基于 https://github.com/dannycoates/zkjs

我看 zkjs 最后一个mr 是你的

@shaoshuai0102 请问"由于 clusterClient 的 API 已经被抹平了多进程差异"这句话中 “被抹平了多进程差异”具体指的什么?

@shaoshuai0102 请问"由于 clusterClient 的 API 已经被抹平了多进程差异"这句话中 “被抹平了多进程差异”具有指的什么?

@gxcsoccer 可以帮解释下

@miser

@shaoshuai0102 这句话的意应该就是说 clusterclient 帮你封装了进程间交互的问题,让你感知不到多进程模式的存在

我写了几次,都没法测出多进程。
process.pid打印出来,都是agent.js下的进程ID。

definition worker
app/worker/worker1.js

worker1.js
    class Work1 extends eggWorker{
        async dosomething1(){
            console.log(`${process.pid } this is process work 1`)
        }

        async dosomething2(params){
            //dosomething2 
            return xxxx
        }
    }

use
await this.app.worker.worker1.dosomething1();
const result = this.app.worker.worker1.dosomething2(xxxx);
这是我觉得,用户最方便使用多进程的初级版本。
更好点,要在 dosomething1内,还能通过this.service.xxx.xxx来调用service类定义的方法。

可以考虑在框架层面抹平多进程差异,比如在Service层面抹平多进程差异,所有Service都在一个独立Worker中运行

Was this page helpful?
0 / 5 - 0 ratings

Related issues

popomore picture popomore  ·  59Comments

fengmk2 picture fengmk2  ·  44Comments

kkys4bfgp75be9p picture kkys4bfgp75be9p  ·  36Comments

zrxisme picture zrxisme  ·  44Comments

V2
popomore picture popomore  ·  32Comments