'How to count duplicated messages of Idempotent Consumer with a metric?
I have an Apache Camel application with an Idempotent Consumer. I need a metric with the total number of duplicated messages. How could I implement such a metric?
Code
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Bean
public MicrometerRoutePolicyFactory micrometerRoutePolicyFactory() {
return new MicrometerRoutePolicyFactory();
}
@Bean
public EndpointRouteBuilder route() {
return new EndpointRouteBuilder() {
@Override
public void configure() throws Exception {
from(file("d:/tmp/camel/"))
.idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
.to(file("d:/tmp/copy/"));
}
};
}
}
Research
I looked into MicrometerConstants
, but I couldn't find a metric for duplicate messages.
Question
How can I count the number of duplicate messages for Idempotent Consumer with a metric?
Solution 1:[1]
I found a workaround, see Idempotent Consumer:
How to handle duplicate messages in the route Available as of Camel 2.8
You can now set the
skipDuplicate
option tofalse
which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages.For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.
Filter duplicate messages
from("direct:start") // instruct idempotent consumer to not skip duplicates as we will filter then our self .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false) .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true)) // filter out duplicate messages by sending them to someplace else and then stop .to("mock:duplicate") .stop() .end() // and here we process only new messages (no duplicates) .to("mock:result");
and MicrometerBuilders#micrometer
:
default MicrometerEndpointBuilderFactory.MicrometerEndpointBuilder micrometer(String path)
Micrometer (camel-micrometer) Collect various metrics directly from Camel routes using the Micrometer library. Category: monitoring Since: 2.22 Maven coordinates: org.apache.camel:camel-micrometer Syntax:
micrometer:metricsType:metricsName
Path parameter: metricsType (required) Type of metrics There are 6 enums and the value can be one of: COUNTER, GAUGE, LONG_TASK_TIMER, TIMER, DISTRIBUTION_SUMMARY, OTHER Path parameter: metricsName (required) Name of metrics Path parameter: tags Tags of metricsParameters:
path - metricsType:metricsName
Returns:
the dsl builder
My modified code:
@Bean
public EndpointRouteBuilder route() {
return new EndpointRouteBuilder() {
@Override
public void configure() throws Exception {
from(file("d:/tmp/camel/"))
.idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
.skipDuplicate(false)
.filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
.to(micrometer("duplicate_messages").increment("1"))
.stop()
.end()
.to(file("d:/tmp/copy/"));
}
};
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |