'How to send RPC replyTo - NestJs Rabbitmq

I have a simple NestJs Microservice application that is listening for messages from a Rabbitmq service.

async function bootstrap() {
  dotenv.config();
  // TODO: Check for ENV valid
  const rabbitOptions = {
    urls: [`amqp://${process.env.MQ_HOSTNAME}:${process.env.MQ_PORT}`],
    queue: process.env.TASK_QUEUE_NAME,
    queueOptions: { durable: false },
  };
  const app = await NestFactory.createMicroservice(AppModule, {
    transport: Transport.RMQ,
    options: rabbitOptions,
  });
  await app.listen(() => console.log(`listening...`));
}
bootstrap();

...

@MessagePattern()
async respondQueue(@Payload() payload: AppleDto, @Ctx() context: RmqContext): Promise<any> {
  console.log('received message', payload);
  console.log(context.getArgs()[0].properties);
  return { response: 'wow' };
}

It consumes the message just fine when I publish from the Rabbitmq UI.

But I have been unsuccessful in replying back to the message on a different queue.

enter image description here

Not sure if this is unsupported or I am doing something wrong.



Solution 1:[1]

I've been stuck for days on the very same issue. The NestJS RabbitMQ microservice will only send the response back to the reply_to queue if the payload contains an id field. Otherwise, the response is never sent back to RabbitMQ.

In your case, replace:

{ "data": "apples" }

by:

{ "id": "apples233", "data": "apples" }

To be complete, what I've noticed when observing the two-way communication between a NestJS RabbitMQ producer and a consumer is that the message sent to the queue by the producer is formatted this way:

Properties

reply_to = amq.rabbitmq.reply-to
correlation_id = <uuid>

Payload

{
  "id": "<the same uuid as for the correlation_id property>",
  "pattern": "<the string you'll set in the MessagePattern decorator>",
  "data":
  {
    ...
  }
}

The amq.rabbitmq.reply-to value for reply_to triggers the Direct Reply-To feature of RabbitMQ, but you can put whatever other value if you want reponses to go into a pre-defined queue of your choice.

Solution 2:[2]

I was having this problem and it occurred to me that when I was creating the client in the module, I gave the queue as a parameter. Logically, I should also provide the replyQueue.

Check out the code below and then my comments.

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ClientsModule.register([
      {
        name: 'queue-module',
        transport: Transport.RMQ,
        options: {
          urls: [process.env.BROKER_URI!],
          queue: process.env.MESSAGING_DISPATCH_QUEUE!,
        },
      },
    ]),
  ],
  controllers: [],
  providers: [QueueService],
  exports: [QueueService],
})
export class QueueModule {}

I did a little interface journey. Gets ClientsModuleOptions inside ClientsModule. ClientsModuleOptions is actually ClientProvider. ClientProvider can be ClientOptions. ClientOptions is RmqOptions in case of Rabbitmq. And here it is, below the RmqOptions:

export interface RmqRecordOptions {
    expiration?: string | number;
    userId?: string;
    CC?: string | string[];
    mandatory?: boolean;
    persistent?: boolean;
    deliveryMode?: boolean | number;
    BCC?: string | string[];
    contentType?: string;
    contentEncoding?: string;
    headers?: Record<string, string>;
    priority?: number;
    messageId?: string;
    timestamp?: number;
    type?: string;
    appId?: string;
}
export declare class RmqRecord<TData = any> {
    readonly data: TData;
    options?: RmqRecordOptions;
    constructor(data: TData, options?: RmqRecordOptions);
}
export declare class RmqRecordBuilder<TData> {
    private data?;
    private options?;
    constructor(data?: TData);
    setOptions(options: RmqRecordOptions): this;
    setData(data: TData): this;
    build(): RmqRecord;
}

So, if I update my app.module.ts as below, the replyQueue will be my value.

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { QueueService } from './queue.service';
import { ConfigModule } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({ isGlobal: true }),
    ClientsModule.register([
      {
        name: 'queue-module',
        transport: Transport.RMQ,
        options: {
          urls: [process.env.BROKER_URI!],
          queue: process.env.MESSAGING_DISPATCH_QUEUE!,
          replyQueue: process.env.MESSAGING_DISPATCH_REPLY_QUEUE!,
        },
      },
    ]),
  ],
  controllers: [],
  providers: [QueueService],
  exports: [QueueService],
})
export class QueueModule {}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Frédéric Feytons
Solution 2 Sami Salih ?brahimba?