'How to count duplicated messages of Idempotent Consumer with a metric?

I have an Apache Camel application with an Idempotent Consumer. I need a metric with the total number of duplicated messages. How could I implement such a metric?

Code

@SpringBootApplication
public class TestApplication {

  public static void main(String[] args) {
    SpringApplication.run(TestApplication.class, args);
  }

  @Bean
  public MicrometerRoutePolicyFactory micrometerRoutePolicyFactory() {
    return new MicrometerRoutePolicyFactory();
  }

  @Bean
  public EndpointRouteBuilder route() {
    return new EndpointRouteBuilder() {
      @Override
      public void configure() throws Exception {
          from(file("d:/tmp/camel/"))
             .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
             .to(file("d:/tmp/copy/"));
      }
    };
  }
}

Research

I looked into MicrometerConstants, but I couldn't find a metric for duplicate messages.

Question

How can I count the number of duplicate messages for Idempotent Consumer with a metric?



Solution 1:[1]

I found a workaround, see Idempotent Consumer:

How to handle duplicate messages in the route Available as of Camel 2.8

You can now set the skipDuplicate option to false which instructs the idempotent consumer to route duplicate messages as well. However the duplicate message has been marked as duplicate by having a property on the Exchange set to true. We can leverage this fact by using a Content Based Router or Message Filter to detect this and handle duplicate messages.

For example in the following example we use the Message Filter to send the message to a duplicate endpoint, and then stop continue routing that message.

Filter duplicate messages

from("direct:start")
    // instruct idempotent consumer to not skip duplicates as we will filter then our self
    .idempotentConsumer(header("messageId")).messageIdRepository(repo).skipDuplicate(false)
    .filter(property(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
        // filter out duplicate messages by sending them to someplace else and then stop
        .to("mock:duplicate")
        .stop()
    .end()
    // and here we process only new messages (no duplicates)
    .to("mock:result");

and MicrometerBuilders#micrometer:

default MicrometerEndpointBuilderFactory.MicrometerEndpointBuilder micrometer(String path)

Micrometer (camel-micrometer) Collect various metrics directly from Camel routes using the Micrometer library. Category: monitoring Since: 2.22 Maven coordinates: org.apache.camel:camel-micrometer Syntax: micrometer:metricsType:metricsName Path parameter: metricsType (required) Type of metrics There are 6 enums and the value can be one of: COUNTER, GAUGE, LONG_TASK_TIMER, TIMER, DISTRIBUTION_SUMMARY, OTHER Path parameter: metricsName (required) Name of metrics Path parameter: tags Tags of metrics

Parameters:
path - metricsType:metricsName

Returns:
the dsl builder

My modified code:

@Bean
public EndpointRouteBuilder route() {
  return new EndpointRouteBuilder() {
    @Override
    public void configure() throws Exception {
       from(file("d:/tmp/camel/"))
         .idempotentConsumer(jsonpath("$.id"), MemoryIdempotentRepository.memoryIdempotentRepository())
           .skipDuplicate(false)
         .filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(true))
           .to(micrometer("duplicate_messages").increment("1"))
           .stop()
         .end()
         .to(file("d:/tmp/copy/"));
    }
  };
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1