'Kafka streams - Concatenate Predicate based on dynamic number of conditions
I'm a bit new in Java so I would appreciate advice to deal with multiple conditions in Kafka Predicates. I've the following code which I'm able to have dynamic filters based on dynamic inputs and avoiding the below 'if/elseifs' ?. I'm keeping the code simple and stupid for easier understanding about what I'm trying to do. I'm trying to understand what would be recommended approach to apply/append how many filters it may need as a consequence of user input. I also would like to know if it's possible to have the Comparison Operators based on user inputs (equals/contains/...).
public Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = envProps.getProperty("input.topic.name");
final String streamsOutputTopic = envProps.getProperty("streams.approved.topic.name");
final String tableOutputTopic = envProps.getProperty("table.output.topic.name");
final Serde<String> stringSerde = Serdes.String();
final KStream<String, ClientTask> stream = builder.stream(inputTopic, Consumed.with(stringSerde,StreamsSerdes.ClientTask()));
String[] Filters = {"State=Work In Progress","Priorit=High"};
final KStream<String, ClientTask> filter_stream = stream.filter(IsGoodtoGoFilter(Filters));
filter_stream.to(streamsOutputTopic, Produced.with(stringSerde, StreamsSerdes.ClientTask()));
return builder.build();
}
public static Predicate<String, ClientTask> IsGoodtoGoFilter(String[] Filters) {
ArrayList<String> GetColumn = new ArrayList<>();
ArrayList<String> GetValue = new ArrayList<>();
for (String FilterColumn: Filters) {
String[] ColumnAndValue = FilterColumn.split("=");
GetColumn.add(ColumnAndValue[0]);
GetValue.add(ColumnAndValue[1]);
}
if (GetColumn.size() == 1) {
return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0));
}
else if (GetColumn.size() == 2) {
return (k, v) -> v.get(GetColumn.get(0)).toString().equals(GetValue.get(0)) &&
v.get(GetColumn.get(1)).toString().equals(GetValue.get(1));
}
return (k, v) -> v.getStatus().equals("Open"); // replace by empty predicate later
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|