'NestJS: How to retry when Kafka consumer fails?
There might be a situation where even though the message is received from Kafka, but due to some reason (Database is down, webhook is offline or ...) still the message can not be processed. So I was expecting to by throwing an Error
or RpcException
I could automatically indicate that the message needs to be retried. However it does not happen and Kafka considers the message as processed even though it has failed.
@MessagePattern('hello.world')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
console.log(originalMessage );
throw new RpcException('need to retry');
}
Solution 1:[1]
First you need to create an exception filter for RpcException
:
import {
ArgumentsHost,
Catch,
Logger,
RpcExceptionFilter,
} from '@nestjs/common';
import { RpcException } from '@nestjs/microservices';
import { Observable, throwError } from 'rxjs';
@Catch(RpcException)
export class KafkaExceptionFilter implements RpcExceptionFilter<RpcException> {
private readonly logger = new Logger(KafkaExceptionFilter.name);
catch(exception: RpcException, host: ArgumentsHost): Observable<any> {
this.logger.warn(exception);
throw exception.getError();
}
}
Then use @UseFilters(new KafkaExceptionFilter())
on your kafka controller methods. Note that it doesn't work when you add it globally.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | omidh |