I want to set expiration in options, but it's not supported right now.
export interface ReadPacket<T = any> {
pattern: any;
data: T;
options:any
}
protected dispatchEvent(packet: ReadPacket): Promise<any> {
return new Promise((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(packet)),
packet.options,
err => (err ? reject(err) : resolve()),
),
);
}
public emit<TResult = any, TInput = any>(
pattern: any,
data: TInput,
options:any
): Observable<TResult> {
if (isNil(pattern) || isNil(data)) {
return _throw(new InvalidMessageException());
}
return defer(async () => this.connect()).pipe(
mergeMap(() => this.dispatchEvent({ pattern, data, options })),
);
}
Since the RabbitMQ implementation is using the amqp library, all options should be supported.
If you look at the documentation on https://docs.nestjs.com/microservices/rabbitmq, you'll see queueOptions
which allows you to pass along exclusive
, durable
, autoDelete
, arguments
, and also some RabbitMQ options with the x-
prefix: messageTtl
, expires
, deadLetterExchange
, maxLength
, maxPriority
. (based on: https://www.squaremobius.net/amqp.node/channel_api.html#assertQueue).
It seems to me, unless I'm misunderstanding, would be to setup your config as such:
const app = await NestFactory.createMicroservice(ApplicationModule, {
transport: Transport.RMQ,
options: {
urls: [`amqp://localhost:5672`],
queue: 'cats_queue',
queueOptions: {
durable: false,
x-expires: 1000, // expire after 1 second
},
},
});
@joeyslack Thanks for your reply, since our business is such like task notification after it's expired, so the queueOptions(expiration) should be dynamic,currently once we setup, it can't be changed
@JLoveI Good news is that it looks like [options] are also supported for sendToQueue: https://www.squaremobius.net/amqp.node/channel_api.html#channel_sendToQueue
Have you tried passing expires
or x-expires
?
@joeyslack The following is the ClientRMQ in nestjs/microservices
export class ClientRMQ extends ClientProxy {
...
protected dispatchEvent(packet: ReadPacket): Promise<any> {
return new Promise((resolve, reject) =>
this.channel.sendToQueue(
this.queue,
Buffer.from(JSON.stringify(packet)),
{},
err => (err ? reject(err) : resolve()),
),
);
}
}
ca't pass options in sendToQueue.
We should probably provide a way to somehow pass down the options through the ClientProxy
. I'll check what we can do here.
We should probably provide a way to somehow pass down the options through the
ClientProxy
. I'll check what we can do here.
Yes we need this to add own Headers to the Msgs.
any news on this? I'd like to pass on header options (ClientProxy.send())
I'd like to put a header option too ! To be used with the ClientProxy.send() for a RabbitMQ queue :)
The MQTT client proxy can provide the underlying MQTT.js client instance, where the publish can be used with any of the library arguments. Here's my adaptation until NestJS supports it natively:
@Injectable()
export class MqttService {
private readonly publishObservable: (topic: string, message: string | Buffer, opts?: IClientPublishOptions) => Observable<Packet>;
constructor(@Inject(MQTT_CLIENT) mqttClient: ClientMqtt) {
const client = mqttClient.createClient();
this.publishObservable = bindNodeCallback(client.publish).bind(client);
}
publish(
topic: string,
message: Record<any, any> | string,
options: IClientPublishOptions,
): Observable<Packet> {
return this.publishObservable(topic, JSON.stringify(message), options);
}
}
You can use a similar approach for subscribe
although that should not be much useful anymore.
Most helpful comment
We should probably provide a way to somehow pass down the options through the
ClientProxy
. I'll check what we can do here.