'can a kafka consumer filter messages before polling all of them from a topic?
It was said that consumers can only read the whole topic. No luck doing evaluations on brokers to filter messages.
It implies that we have to consume/receive all messages from a topic and filter them on the client side.
That's too much. I was wondering if we can filter and receive specific types of messages, based on somethings already passed to brokers, such as the msg keys or other things.
from the method, Consumer.poll(timeout), it seems no extra things we can do.
Solution 1:[1]
No, with the Consumer you cannot only receive some messages from topics. The consumer fetches all messages in order.
If you don't want to filter messages in the Consumer, you could use a Streams job. For example, Streams would read from your topic and only push to another topic the messages the consumer is interested in. Then the consumer can subscribe to this new topic.
Solution 2:[2]
Each Kafka topic should contain messages that are logically similar, just to stay on topic. Now, sometimes it might happen that you have a topic, let's say fruits, which contains different attributes of the fruit (maybe in json format). You may have different fruits messages pushed by the producers, but want one of your consumer group to process only apples. Ideally you might have gone with topic names with individual fruit name, but let's assume that to be a fruitless endeavor for some reason (maybe too many topics). In that case, you can override the default partitioning scheme in Kafka to ignore the key and do a random partitioning, and then pass your custom-partitioner class through the partitioner.class property in the producer, that puts the fruit name in the msg key. This is required because by default if you put the key while sending a message, it will always go to the same partition, and that might cause partition imbalance.
The idea behind this is sometimes if your Kafka msg value is a complex object (json, avro-record etc) it might be quicker to filter the record based on key, than parsing the whole value, and extracting the desired field. I don't have any data right now, to support the performance benefit of this approach though. It's only an intuition.
Solution 3:[3]
Once records are already pushed into Kafka cluster, there is not much that you can do. Whatever you want to filter, you will always have to bring the chunks of data to the client.
Unfortunately, the only option is to pass that logic to the Producers, in that way you can push the data into multiple topics based on particular logic you can define.
Solution 4:[4]
Kafka Consumer will receive all messages from topic. But if there is any custom message type (MyMessage) that only needs to be consumed then it can be filtered in Deserializer class. If the consumer gets two types of messages like String and MyMessage then String type messages will be ignored and MyMessage type messages will be processed.
public class MyMessageDeserializer implements Deserializer<MyMessage> {
@Override
public MyMessage deserialize(String topic, byte[] data) {
try {
if (data == null){
logger.info("Null received at deserializing");
return null;
}
return objectMapper.readValue(new String(data, "UTF-8"), MyMessage.class);
} catch (Exception e) {
logger.error("Deserialization exception: " + e.getMessage());
}
return null;
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Mickael Maison |
Solution 2 | |
Solution 3 | dbustosp |
Solution 4 | procrastinator |