Nest: Pass options alongside the message/event (client proxy)

Created on 30 Sep 2019  路  9Comments  路  Source: nestjs/nest

Feature Request

Is your feature request related to a problem? Please describe.


I want to set expiration in options, but it's not supported right now.

Describe the solution you'd like

export interface ReadPacket<T = any> {
  pattern: any;
  data: T;
  options:any
}

 protected dispatchEvent(packet: ReadPacket): Promise<any> {
    return new Promise((resolve, reject) =>
      this.channel.sendToQueue(
        this.queue,
        Buffer.from(JSON.stringify(packet)),
        packet.options,
        err => (err ? reject(err) : resolve()),
      ),
    );
  }

  public emit<TResult = any, TInput = any>(
    pattern: any,
    data: TInput,
    options:any
  ): Observable<TResult> {
    if (isNil(pattern) || isNil(data)) {
      return _throw(new InvalidMessageException());
    }
    return defer(async () => this.connect()).pipe(
      mergeMap(() => this.dispatchEvent({ pattern, data, options })),
    );
  }

Teachability, Documentation, Adoption, Migration Strategy

What is the motivation / use case for changing the behavior?

microservices type

Most helpful comment

We should probably provide a way to somehow pass down the options through the ClientProxy. I'll check what we can do here.

All 9 comments

Since the RabbitMQ implementation is using the amqp library, all options should be supported.

If you look at the documentation on https://docs.nestjs.com/microservices/rabbitmq, you'll see queueOptions which allows you to pass along exclusive, durable, autoDelete, arguments, and also some RabbitMQ options with the x- prefix: messageTtl, expires, deadLetterExchange, maxLength, maxPriority. (based on: https://www.squaremobius.net/amqp.node/channel_api.html#assertQueue).

It seems to me, unless I'm misunderstanding, would be to setup your config as such:

const app = await NestFactory.createMicroservice(ApplicationModule, {
  transport: Transport.RMQ,
  options: {
    urls: [`amqp://localhost:5672`],
    queue: 'cats_queue',
    queueOptions: { 
        durable: false,
        x-expires: 1000, // expire after 1 second
    },
  },
});

@joeyslack Thanks for your reply, since our business is such like task notification after it's expired, so the queueOptions(expiration) should be dynamic,currently once we setup, it can't be changed

@JLoveI Good news is that it looks like [options] are also supported for sendToQueue: https://www.squaremobius.net/amqp.node/channel_api.html#channel_sendToQueue

Have you tried passing expires or x-expires?

@joeyslack The following is the ClientRMQ in nestjs/microservices

export class ClientRMQ extends ClientProxy {
 ...
protected dispatchEvent(packet: ReadPacket): Promise<any> {
    return new Promise((resolve, reject) =>
      this.channel.sendToQueue(
        this.queue,
        Buffer.from(JSON.stringify(packet)),
        {},
        err => (err ? reject(err) : resolve()),
      ),
    );
  }
}

ca't pass options in sendToQueue.

We should probably provide a way to somehow pass down the options through the ClientProxy. I'll check what we can do here.

We should probably provide a way to somehow pass down the options through the ClientProxy. I'll check what we can do here.

Yes we need this to add own Headers to the Msgs.

any news on this? I'd like to pass on header options (ClientProxy.send())

I'd like to put a header option too ! To be used with the ClientProxy.send() for a RabbitMQ queue :)

The MQTT client proxy can provide the underlying MQTT.js client instance, where the publish can be used with any of the library arguments. Here's my adaptation until NestJS supports it natively:

@Injectable()
export class MqttService {

  private readonly publishObservable: (topic: string, message: string | Buffer, opts?: IClientPublishOptions) => Observable<Packet>;

  constructor(@Inject(MQTT_CLIENT) mqttClient: ClientMqtt) {
    const client = mqttClient.createClient();
    this.publishObservable = bindNodeCallback(client.publish).bind(client);
  }

  publish(
    topic: string,
    message: Record<any, any> | string,
    options: IClientPublishOptions,
  ): Observable<Packet> {
    return this.publishObservable(topic, JSON.stringify(message), options);
  }
}

You can use a similar approach for subscribe although that should not be much useful anymore.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

JulianBiermann picture JulianBiermann  路  3Comments

cdiaz picture cdiaz  路  3Comments

rafal-rudnicki picture rafal-rudnicki  路  3Comments

KamGor picture KamGor  路  3Comments

mishelashala picture mishelashala  路  3Comments