'Avro Definition for Custom Aggregator

I have a code where I am aggregating the data from Kafka stream via:

StreamsBuilder streamsBuilder = new StreamsBuilder();
    streamsBuilder.table(AppConfigs.topicName,
        Consumed.with(AppSerdes.String(), AppSerdes.Employee()))
        .groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(AppSerdes.String(), AppSerdes.Employee()))
        .aggregate(
            //Initializer
            
            //Adder
            
            //Subtractor
            
            //Serializer
            Materialized.<String, DepartmentAggregate, KeyValueStore<Bytes, byte[]>>as(
                AppConfigs.stateStoreName).withValueSerde(AppSerdes.DepartmentAggregate())
        )

I have written the custom Json Serdes for "DepartmentAggregate" as

public class AppSerdes extends Serdes {    

static final class DepartmentAggSerde extends WrapperSerde<DepartmentAggregate> {
    DepartmentAggSerde() {
        super(new JsonSerializer<>(), new JsonDeserializer<>());
    }
}

public static Serde<DepartmentAggregate> DepartmentAggregate() {
    DepartmentAggSerde serde = new DepartmentAggSerde();

    Map<String, Object> serdeConfigs = new HashMap<>();
    serdeConfigs.put(JsonDeserializer.VALUE_CLASS_NAME_CONFIG, DepartmentAggregate.class);
    serde.configure(serdeConfigs, false);

    return serde;
}

Now I have a requirement where instead of Json, I have to write the same custome logic for Avro Aggregator(Basically, collect values in a list based on id's). Can someone please help with the custom Avro Aggregator code?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source