'Kafka producer Interceptor
I am trying add a Interceptor to do validation for the messages published by the producer to Kafka topic. I need to do few validations in addition to Schema validation which is performed by Kafka topic. Steps I have followed are as follows.
- I have written a Java class extending ProducerInterceptor Interface.
- Compiled the class and created a jar file which is placed in a folder included in the classpath.
- Added
interceptors.classes = classname
to producer.properties inside Kafka installation.
But when I publish message to the topic the custom interceptor class which I have written is not invoked. (I am not getting any errors also. Messages are published to topic perfectly.)
I haver referred https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
Solution 1:[1]
This question is pretty old, so I assume you found a solution in the meantime. However, just in case it helps someone else, I found that my ProducerInterceptor
class, which dispatches messages to different topics based on the contents of the message, was not invoked unless my stream already had a specified output.
My first attempt looked something like this because I thought I didn't need to specify an output topic. This does NOT work:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
But this does:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
It's worth noting that nothing gets published to that dummy-output-topic
in the second example, and that using to
instead of through
also appears to work the same way.
In my case, I was invoking map
to change the records before using the interceptor to dispatch them to different topics, so my code actually looks more like this:
val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
.map(new CustomKeyValueMapper)
.through("dummy-output-topic")
val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()
I hope those examples help anyone working with ProducerInterceptor
s who made the same mistake I did.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Emma Burrows |