'Kafka producer Interceptor

I am trying add a Interceptor to do validation for the messages published by the producer to Kafka topic. I need to do few validations in addition to Schema validation which is performed by Kafka topic. Steps I have followed are as follows.

  1. I have written a Java class extending ProducerInterceptor Interface.
  2. Compiled the class and created a jar file which is placed in a folder included in the classpath.
  3. Added interceptors.classes = classname to producer.properties inside Kafka installation.

But when I publish message to the topic the custom interceptor class which I have written is not invoked. (I am not getting any errors also. Messages are published to topic perfectly.)

I haver referred https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors



Solution 1:[1]

This question is pretty old, so I assume you found a solution in the meantime. However, just in case it helps someone else, I found that my ProducerInterceptor class, which dispatches messages to different topics based on the contents of the message, was not invoked unless my stream already had a specified output.

My first attempt looked something like this because I thought I didn't need to specify an output topic. This does NOT work:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

But this does:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

It's worth noting that nothing gets published to that dummy-output-topic in the second example, and that using to instead of through also appears to work the same way.

In my case, I was invoking map to change the records before using the interceptor to dispatch them to different topics, so my code actually looks more like this:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
    .map(new CustomKeyValueMapper)
    .through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

I hope those examples help anyone working with ProducerInterceptors who made the same mistake I did.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Emma Burrows