'Avro Definition for Custom Aggregator
I have a code where I am aggregating the data from Kafka stream via:
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table(AppConfigs.topicName,
Consumed.with(AppSerdes.String(), AppSerdes.Employee()))
.groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(AppSerdes.String(), AppSerdes.Employee()))
.aggregate(
//Initializer
//Adder
//Subtractor
//Serializer
Materialized.<String, DepartmentAggregate, KeyValueStore<Bytes, byte[]>>as(
AppConfigs.stateStoreName).withValueSerde(AppSerdes.DepartmentAggregate())
)
I have written the custom Json Serdes for "DepartmentAggregate" as
public class AppSerdes extends Serdes {
static final class DepartmentAggSerde extends WrapperSerde<DepartmentAggregate> {
DepartmentAggSerde() {
super(new JsonSerializer<>(), new JsonDeserializer<>());
}
}
public static Serde<DepartmentAggregate> DepartmentAggregate() {
DepartmentAggSerde serde = new DepartmentAggSerde();
Map<String, Object> serdeConfigs = new HashMap<>();
serdeConfigs.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, DepartmentAggregate.class);
serde.configure(serdeConfigs, false);
return serde;
}
Now I have a requirement where instead of Json, I have to write the same custome logic for Avro Aggregator(Basically, collect values in a list based on id's). Can someone please help with the custom Avro Aggregator code?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|