'spring reactive retry with exponential backoff conditionally

Using spring reactive WebClient, I consume an API and in case of response with 500 status I need to retry with exponential backoff. But in Mono class, I don't see any retryBackoff with Predicate as input parameter.

This is the kind of function I search for:

public final Mono<T> retryBackoff(Predicate<? super Throwable> retryMatcher, long numRetries, Duration firstBackoff)

Right now my implementation is as following (I don't have retry with backOff mechanism):

client.sendRequest()
    .retry(e -> ((RestClientException) e).getStatus() == 500)
    .subscribe();


Solution 1:[1]

You might want to have a look at the reactor-extra module in the reactor-addons project. In Maven you can do:

<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
    <version>3.2.3.RELEASE</version>
</dependency>

And then use it like this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryWhen(Retry.onlyIf(ctx -> ctx.exception() instanceof RestClientException)
                    .exponentialBackoff(firstBackoff, maxBackoff)
                    .retryMax(maxRetries))

Solution 2:[2]

Retry.onlyIf is now deprecated/removed.

If anyone is interested in the up-to-date solution:

client.post()
      .syncBody("test")
      .retrieve()
      .bodyToMono(String.class)
      .retryWhen(Retry.backoff(maxRetries, minBackoff).filter(ctx -> {
          return ctx.exception() instanceof RestClientException && ctx.exception().statusCode == 500; 
      }))

It's worth mentioning that retryWhen wraps the source exception into the RetryExhaustedException. If you want to 'restore' the source exception you can use the reactor.core.Exceptions util:

.onErrorResume(throwable -> {
    if (Exceptions.isRetryExhausted(throwable)) {
        throwable = throwable.getCause();
    }
    return Mono.error(throwable);
})

Solution 3:[3]

I'm not sure, what spring version you are using, in 2.1.4 I have this:

client.post()
    .syncBody("test")
    .retrieve()
    .bodyToMono(String.class)
    .retryBackoff(numretries, firstBackoff, maxBackoff, jitterFactor);

... so that's exactly what you want, right?

Solution 4:[4]

I'm currently trying it with Kotlin Coroutines + Spring WebFlux:

It seems the following is not working:

suspend fun ClientResponse.asResponse(): ServerResponse =
    status(statusCode())
        .headers { headerConsumer -> headerConsumer.addAll(headers().asHttpHeaders()) }
        .body(bodyToMono(DataBuffer::class.java), DataBuffer::class.java)
        .retryWhen { 
            Retry.onlyIf { ctx: RetryContext<Throwable> -> (ctx.exception() as? WebClientResponseException)?.statusCode in retryableErrorCodes }
                .exponentialBackoff(ofSeconds(1), ofSeconds(5))
                .retryMax(3)
                .doOnRetry { log.error("Retry for {}", it.exception()) }
        )
        .awaitSingle()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Johan
Solution 2 pixel
Solution 3 Frischling
Solution 4 Robert W.