Nest: EventBus implementation for Reactor pattern

Created on 17 Jul 2018  ·  7Comments  ·  Source: nestjs/nest

I'm submitting a...


[ ] Regression 
[ ] Bug report
[x] Feature request
[ ] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

There is no easy way to call one service’s method from other service other then injecting service and calling directly

Expected behavior


We need a framowkt for @on('int.echo'), notify(‘int.echo’) style inter component communication.
This will enable use to fire an event from controller and gateway react to it and send websocket message to client

Minimal reproduction of the problem with instructions

What is the motivation / use case for changing the behavior?


Grails/spring example Example

https://objectpartners.com/2015/10/22/asynchronous-programming-in-grails-3/

Environment


Nest version: X.Z


For Tooling issues:
- Node version: XX  
- Platform:  

Others:

Most helpful comment

Thanks for sharing @xmlking. Another possible solution would be to use rxjs subjects.

All 7 comments

@nestjs/cqrs

发自我的 iPhone

在 2018年7月17日,22:54,Sumanth Chinthagunta notifications@github.com 写道:

I'm submitting a...

[ ] Regression
[ ] Bug report
[x] Feature request
[ ] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.
Current behavior

There is no easy way to call one service’s method from other service other then injecting service and calling directly

Expected behavior

We need a framowkt for @on('int.echo'), notify(‘int.echo’) style inter component communication.
This will enable use to fire an event from controller and gateway react to it and send websocket message to client

Minimal reproduction of the problem with instructions

What is the motivation / use case for changing the behavior?

Grails/spring example Example

https://objectpartners.com/2015/10/22/asynchronous-programming-in-grails-3/

Environment

Nest version: X.Z

For Tooling issues:

  • Node version: XX
  • Platform:

Others:


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub, or mute the thread.

@whtiehack thanks I need to explore how to make use of it.
My case is simple. Here is what I am planing to implement:
I have a generic websocketGateway that keep track of connected client sockets + user.

  1. When I have to send a message/object to an user form e.g, a controller/service , I inject WebsocketGateway and call send(user, dara) method on gateway.
  2. When WebsocketGateway receives a message from end user, based on message type , will publish apprirate command so that handler will execute business logic.

What about using Event Emitter ? There is a built-in module in nodejs.

I will soon release a couple of modules to help with this with Decorators of course, and eventemitter is one of them

@shekohex great idea. I will explore today and share my results

Ended up with this implementation. I can now send action objects from UI to backend and vice versa.
@shekohex @kamilmysliwiec looking forward for your Decorators to reduce boilerplate code.
this can be coupled with @amcdnl ngxs-websocket plugin
eventbus.gateway.ts

import {
  OnGatewayConnection,
  OnGatewayDisconnect,
  OnGatewayInit,
  SubscribeMessage,
  WebSocketGateway, WebSocketServer,
  WsResponse,
} from '@nestjs/websockets';
import { Observable, of } from 'rxjs';
import {EventEmitter} from 'events';
import {Logger, UseGuards} from '@nestjs/common';
import {delay} from 'rxjs/operators';
import { ISocket } from './interfaces/socket.interface';
import { Server } from 'socket.io';
import {getActionTypeFromInstance, actionMatcher} from '@ngxs/store';
import {AuthService, User, WsAuthGuard} from '../auth';

@WebSocketGateway({ namespace: 'eventbus'})
export class EventBusGateway extends EventEmitter implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  private readonly logger = new Logger(EventBusGateway.name);
  static EVENTS = 'events';
  static ACTIONS = 'actions';

  @WebSocketServer() server: Server;
  clients: ISocket[] = [];

  constructor(private authService: AuthService) {
    super();
  }

  public afterInit(server) {
  }

  public handleConnection(client: ISocket) {
    this.logger.log(`Client connected => ${client.id}  ${client.handshake.query.token}`);
    this.clients.push(client);
  }

  public handleDisconnect(client: ISocket) {
    this.logger.log(`Client disconnected => ${client.id}`);
    this.clients = this.clients.filter(c => c.id !== client.id);
  }

  @UseGuards(WsAuthGuard)
  @SubscribeMessage('auth')
  onAuthenticate(client: ISocket, [data, cb]) {
    const event = 'auth';
    if (cb) {
      // We will be sending via callback here.
      cb({status: 'success'});
    } else {
      return {event, status: 'success'};
    }
  }

  @SubscribeMessage('actions')
  onActions(client: ISocket, action: any) {
    this.logger.log(`test  => ${client.id}  ${client.user.userId}`);
    this.emit(action.type, action);
  }

  public sendActionToUser<T>(user: User, action: any): void {
    const clients = this.getSocketsForUser(user);
    const type = getActionTypeFromInstance(action);
    clients.forEach(socket => socket.emit(EventBusGateway.ACTIONS, { ...action, type }));
  }

  private getSocketsForUser(user: User): ISocket[] {
    return this.clients.filter(c => c.user && c.user.userId === user.userId);
  }
}

notification.service.ts

import {Injectable, OnModuleDestroy, OnModuleInit} from '@nestjs/common';
import {User} from 'auth';
import {EventBusGateway} from 'eventbus';
import {AddNotification, SeenNotification} from 'core';

@Injectable()
export class NotificationService implements OnModuleInit, OnModuleDestroy {

  async onModuleInit() {
    this.eventBus.on(AddNotification.type, this.addNotification.bind(this));
    this.eventBus.on(SeenNotification.type, this.seenNotification.bind(this));
  }
  onModuleDestroy() {
    this.eventBus.off(AddNotification.type, this.addNotification.bind(this));
    this.eventBus.off(SeenNotification.type, this.seenNotification.bind(this));
  }

  constructor(private readonly eventBus: EventBusGateway) {}

  sendNotification(user: User, action: any) {
    this.eventBus.sendActionToUser(user, action);
  }

  addNotification(action: AddNotification) {
    console.log('AddNotification', action);
  }

  seenNotification(action: SeenNotification) {
    console.log('SeenNotification', action);
  }
}

notification.controller.ts

import {Controller, Get, HttpCode, HttpStatus, Logger, Req} from '@nestjs/common';
import {CurrentUser} from '../auth';
import {NotificationService} from './notification.service';
import {AddNotification, SeenNotification} from 'core';

@Controller()
export class NotificationController {
  constructor(private readonly nService: NotificationService) {}

  @Get('/notifications')
  @HttpCode(HttpStatus.ACCEPTED)
  notifications(@Req() req, @CurrentUser() user) {
    this.nService.sendNotification(user, new AddNotification('test123'));
    this.nService.sendNotification(user, new SeenNotification('test321'));
  }
}

notification.actions.ts

// Actions
export class AddNotification {
  static readonly type = '[Notification] Add';
  constructor(public readonly payload: any) {}
}

export class SeenNotification {
  static readonly type = '[Notification] Seen';
  constructor(public readonly payload: any) {}
}

Thanks for sharing @xmlking. Another possible solution would be to use rxjs subjects.

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

JulianBiermann picture JulianBiermann  ·  3Comments

breitsmiley picture breitsmiley  ·  3Comments

2233322 picture 2233322  ·  3Comments

VRspace4 picture VRspace4  ·  3Comments

rlesniak picture rlesniak  ·  3Comments