'Measure execution time in Project Reactor

I'm trying to load test one of my projects and I think somewhere in my changes I may have introduced some point where the performance is getting impacted.

Is there a way to measure execution time and log in case it exceeds the threshold?

flux
  .<doOnSomethingIsTakingTooLong>(signal -> 
       log.error("SLOW PROCESSING {}", signal), Duration.ofSeconds(2)


Solution 1:[1]

There are several ways to achieve it:

  1. Expose metrics

It's not exactly what you asked, but metrics would be a preferred way to collect such information. Reactor exposes multiple metrics that could help to investigate performance issues. You could use metrics() operator in conjunction with name() & tag() to emit metrics. For more details, see Exposing Reactor metrics

events()
    .flatMap(rec ->
            processEvent(rec)
              .name("process") 
              .metrics() 
    )

You could also enable scheduler metrics Schedulers.enableMetrics();

This project has a good example of monitoring based on Prometheus and Grafana Reactor monitoring demo.

  1. Use context to measure execution time

The idea is to save start time in the context and then calculate execution time when complete event is emitted.

events()
    .flatMap(rec ->
            processEvent(rec)
                 .transform(this::reportExecutionTime)
    );

where reportExecutionTime is implemented like the following

private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
    String taskStartMsKey = "task.start";

    return Mono.deferContextual(ctx -> mono
                    .doOnSuccess(__ -> {
                        var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
                        log.info("execution time: {}", executionTime);
                    })
            )
            .contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}

Here is a full test example

@Test
void reportExecutionTimeTest() {
    var stream = events()
            .flatMap(rec ->
                    processEvent(rec)
                            .transform(this::reportExecutionTime)
            );

    StepVerifier.create(stream)
            .expectNextCount(100)
            .verifyComplete();
}

private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
    String taskStartMsKey = "task.start";

    return Mono.deferContextual(ctx -> mono
                    .doOnSuccess(__ -> {
                        var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
                        log.info("execution time: {}", executionTime);
                    })
            )
            .contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}

private Flux<Integer> events() {
    return Flux.range(1, 100);
}

private Mono<Integer> processEvent(int i) {
    return Mono.delay(Duration.ofMillis(i + 100))
            .thenReturn(i);
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1