'How to send RPC replyTo - NestJs Rabbitmq
I have a simple NestJs Microservice application that is listening for messages from a Rabbitmq service.
async function bootstrap() {
dotenv.config();
// TODO: Check for ENV valid
const rabbitOptions = {
urls: [`amqp://${process.env.MQ_HOSTNAME}:${process.env.MQ_PORT}`],
queue: process.env.TASK_QUEUE_NAME,
queueOptions: { durable: false },
};
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: rabbitOptions,
});
await app.listen(() => console.log(`listening...`));
}
bootstrap();
...
@MessagePattern()
async respondQueue(@Payload() payload: AppleDto, @Ctx() context: RmqContext): Promise<any> {
console.log('received message', payload);
console.log(context.getArgs()[0].properties);
return { response: 'wow' };
}
It consumes the message just fine when I publish from the Rabbitmq UI.
But I have been unsuccessful in replying back to the message on a different queue.
Not sure if this is unsupported or I am doing something wrong.
Solution 1:[1]
I've been stuck for days on the very same issue.
The NestJS RabbitMQ microservice will only send the response back to the reply_to
queue if the payload contains an id
field. Otherwise, the response is never sent back to RabbitMQ.
In your case, replace:
{ "data": "apples" }
by:
{ "id": "apples233", "data": "apples" }
To be complete, what I've noticed when observing the two-way communication between a NestJS RabbitMQ producer and a consumer is that the message sent to the queue by the producer is formatted this way:
Properties
reply_to = amq.rabbitmq.reply-to
correlation_id = <uuid>
Payload
{
"id": "<the same uuid as for the correlation_id property>",
"pattern": "<the string you'll set in the MessagePattern decorator>",
"data":
{
...
}
}
The amq.rabbitmq.reply-to
value for reply_to
triggers the Direct Reply-To feature of RabbitMQ, but you can put whatever other value if you want reponses to go into a pre-defined queue of your choice.
Solution 2:[2]
I was having this problem and it occurred to me that when I was creating the client in the module, I gave the queue as a parameter. Logically, I should also provide the replyQueue.
Check out the code below and then my comments.
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ClientsModule.register([
{
name: 'queue-module',
transport: Transport.RMQ,
options: {
urls: [process.env.BROKER_URI!],
queue: process.env.MESSAGING_DISPATCH_QUEUE!,
},
},
]),
],
controllers: [],
providers: [QueueService],
exports: [QueueService],
})
export class QueueModule {}
I did a little interface journey. Gets ClientsModuleOptions
inside ClientsModule
. ClientsModuleOptions
is actually ClientProvider
. ClientProvider
can be ClientOptions
. ClientOptions
is RmqOptions
in case of Rabbitmq. And here it is, below the RmqOptions
:
export interface RmqRecordOptions {
expiration?: string | number;
userId?: string;
CC?: string | string[];
mandatory?: boolean;
persistent?: boolean;
deliveryMode?: boolean | number;
BCC?: string | string[];
contentType?: string;
contentEncoding?: string;
headers?: Record<string, string>;
priority?: number;
messageId?: string;
timestamp?: number;
type?: string;
appId?: string;
}
export declare class RmqRecord<TData = any> {
readonly data: TData;
options?: RmqRecordOptions;
constructor(data: TData, options?: RmqRecordOptions);
}
export declare class RmqRecordBuilder<TData> {
private data?;
private options?;
constructor(data?: TData);
setOptions(options: RmqRecordOptions): this;
setData(data: TData): this;
build(): RmqRecord;
}
So, if I update my app.module.ts
as below, the replyQueue will be my value.
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ClientsModule.register([
{
name: 'queue-module',
transport: Transport.RMQ,
options: {
urls: [process.env.BROKER_URI!],
queue: process.env.MESSAGING_DISPATCH_QUEUE!,
replyQueue: process.env.MESSAGING_DISPATCH_REPLY_QUEUE!,
},
},
]),
],
controllers: [],
providers: [QueueService],
exports: [QueueService],
})
export class QueueModule {}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Frédéric Feytons |
Solution 2 | Sami Salih ?brahimba? |