'nestjs event-based messaging with 1 producer and 2 consumers

With nestjs microservices, you can send messages and receive the result using request/response-based approach. That is implemented with a combination of @MessagePattern and client.send('my_pattern', myData). For an example see the nest docs: https://docs.nestjs.com/microservices/basics#request-response and https://docs.nestjs.com/microservices/basics#sending-messages.

How do I receive the result in the event-based approach?

Suppose you have a user microservice and an auth microservice. Whenever a user is created you want an auth subject to be created as well (saving the username and a hash of the password, so that the user may login with an api request to the auth microservice instead of the user service).

auth/auth.controller.ts

  @EventPattern('EVT_USER_CREATED')
  public async handleUserCreated(data: any): Promise<AuthSubject> {
    if (!data.username || !data.password) {
      throw new RpcException('Auth subject must supply username and password');
    } 
    const newSubject: CreateAuthSubject = {
      username: data.username,
      password: data.password,
      email: data.email ?? '',
    };
    const sub = await this.subjectService.create(subject);
    return sub;
  }

user/user.controller.ts

  @Post('')
  @ApiBody({ type: CreateUser })
  @ApiCreatedResponse({ type: User })
  public async create(@Body() user: CreateUser): Promise<User> {
    const newUser = await this.userService.create(user);

    this.userQueue
      .emit<any, CreateUser>('EVT_USER_CREATED', user)
      .subscribe((res) => {
        console.log(res);   // undefined
      });

    return newUser;
  }

To verify that there is no error in my setup, I changed @EventPattern to @MessagePattern and this.userQueue.emit<... to this.userQueue.send<.... It worked, i.e. res was a valid auth subject with username and password as expected. However, with the event-based approach outlined in this question res is always undefined (whether or not the auth controllers handleUserCreated returns or throws).

Ultimately I would like to achieve the following: If another microservice needs to process 'EVT_USER_CREATED' events, I just add a @EventPattern('EVT_USER_CREATED') public async handleUserCreated method to its controller. The observable this.userQueue.emit<any, CreateUser>('EVT_USER_CREATED', user) would then receive both results: Once for each of the microservices consuming the user created event.

So suppose I add a third microservice: the customer microservice that is responsible for saving payment information, the history of orders, etc. Like the auth service it subscribes to 'EVT_USER_CREATED'.

customer/customer.controller.ts

  @EventPattern('EVT_USER_CREATED')
  public async handleUserCreated(data: any): Promise<AuthSubject> {    
    const customer = await this.customerService.create(data);
    return customer ;
  }

Now with the above setup the microservices auth and customer will alternate in receiving the events: If the user microservices emits the creation of a user, only the auth service will react to it and create an auth subject from hat user. No customer will be created for that user. For the next user that is created in the user microservice, only a customer will be created but not an auth subject. The third created user will again be consumed by the auth microservice but not by the customer microservice. And so on.

                                           -- auth microservice 
                                         /
user microservice --- message broker ---
                                         \
                                           -- customer microservice

To summarize: How do I achive the messaging architecture shown in the diagram, such that I only need one emit(...) call in the user.controller.ts and, such that I reveive both responses in the subscrption to the emit(...) call?



Solution 1:[1]

It may be a bit late but I leave my contribution for future users.

For this type of architecture strongly based on events, it is advisable to use a messaging broker such as Kafka, with which different services can be subscribed to different topics, and even different services can listen to the same topic, in this way you can react in different ways to the same event.

Kafka also offers many advantages that will be useful when you want to scale your microservices, and as an extra, it has support in Nest. https://docs.nestjs.com/microservices/kafka

What you should keep in mind is that when messaging systems are used, communication is usually completely asynchronous, this is an important detail since, following the case you present, not only is it enough for you to send a message through kafka to another microservice to validate the credentials of a user, but it is necessary to return a response to the client. For this case, the tools offered by nest are limited since using the @MessagePattern decorator, we can receive messages from kafka in our controller, but we cannot wait for a response (of the same topic or another) with the confirmation of success to respond to the Customer request. In this case, it would be convenient for you to create your own transporter or your own kafka client with Kafkajs (https://kafka.js.org/) so that if necessary, you can keep the user's request until you receive a confirmation for another topic.

A solution that I have seen in certain forums is to save an object in memory (key / value) with the "Response" object of a request associated with the id of a user, in this way you can send a message by kafka or any other broker with a specific action, and receive the confirmation by another controller, retrieve the user's "Request" object and then, send the response (Request.send (...)). It is not the ideal solution but it works for some cases.

For example:

@Controller('users')
class MyController {
    constructor(private kafkaClient: KafkaClient) {}
    private users: new Map<string, Request>() // Or private users: any = {}

    onModuleInit() {
        // By default, NestJs create topic "users-topic.reply"
        this.kafkaClient.subscribeToResponseOf('users-topic')
    }

    @Post('auth')
    authUser(@Req req: Request, @Res res: Response): void {
        const userData = req.body;
        const { id } = userData;

        // Save request object in private variable;
        this.users.set(id, res); // or this.users[id] = res;

        // Send kafka message to validate user data
        this.kafkaClient.emit('users-topic')
    }

    @MessagePattern('users-topic.reply')
    authUser(@Payload() data: any): any {
        if (data.message === 'success') {
            const res:Request = this.users.get(data.id); // or this.users[data.id];
            
            // Clean users object
            this.users.remove(data.id); // or this.users[data.id] = null;

            // Response client request
            res.send('Authentication successfully')
        }
    }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alejandro Barrios