'Measure execution time in Project Reactor
I'm trying to load test one of my projects and I think somewhere in my changes I may have introduced some point where the performance is getting impacted.
Is there a way to measure execution time and log in case it exceeds the threshold?
flux
.<doOnSomethingIsTakingTooLong>(signal ->
log.error("SLOW PROCESSING {}", signal), Duration.ofSeconds(2)
Solution 1:[1]
There are several ways to achieve it:
- Expose metrics
It's not exactly what you asked, but metrics would be a preferred way to collect such information. Reactor exposes multiple metrics that could help to investigate performance issues. You could use metrics()
operator in conjunction with name()
& tag()
to emit metrics. For more details, see Exposing Reactor metrics
events()
.flatMap(rec ->
processEvent(rec)
.name("process")
.metrics()
)
You could also enable scheduler metrics Schedulers.enableMetrics();
This project has a good example of monitoring based on Prometheus and Grafana Reactor monitoring demo.
- Use context to measure execution time
The idea is to save start time in the context and then calculate execution time when complete event is emitted.
events()
.flatMap(rec ->
processEvent(rec)
.transform(this::reportExecutionTime)
);
where reportExecutionTime
is implemented like the following
private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
String taskStartMsKey = "task.start";
return Mono.deferContextual(ctx -> mono
.doOnSuccess(__ -> {
var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
log.info("execution time: {}", executionTime);
})
)
.contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}
Here is a full test example
@Test
void reportExecutionTimeTest() {
var stream = events()
.flatMap(rec ->
processEvent(rec)
.transform(this::reportExecutionTime)
);
StepVerifier.create(stream)
.expectNextCount(100)
.verifyComplete();
}
private <T> Mono<T> reportExecutionTime(Mono<T> mono) {
String taskStartMsKey = "task.start";
return Mono.deferContextual(ctx -> mono
.doOnSuccess(__ -> {
var executionTime = currentTimeMillis() - ctx.<Long>get(taskStartMsKey);
log.info("execution time: {}", executionTime);
})
)
.contextWrite(ctx -> ctx.put(taskStartMsKey, currentTimeMillis()));
}
private Flux<Integer> events() {
return Flux.range(1, 100);
}
private Mono<Integer> processEvent(int i) {
return Mono.delay(Duration.ofMillis(i + 100))
.thenReturn(i);
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |