'How to write a KafkaAvro Serde for GenericData.Record

I'm using Kafka 0.10.2 and Avro for the serialization of my messages, both for the key and for the value data. Now I would like to use Kafka Streams but I'm stuck trying to write the Serde class for the GenericData.Record class.

import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]

public final class KafkaAvroSerde implements Serde<Record> {

    private final Serde<Record> inner;

    public KafkaAvroSerde() {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
    }

    public KafkaAvroSerde(SchemaRegistryClient client) {
        this(client, Collections.emptyMap());
    }

    public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
        // Here I get the error
        inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
    }

    @Override
    public Serializer<Record> serializer() {
        return inner.serializer();
    }

    @Override
    public Deserializer<Record> deserializer() {
        return inner.deserializer();
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        inner.serializer().configure(configs, isKey);
        inner.deserializer().configure(configs, isKey);
    }

    @Override
    public void close() {
        inner.serializer().close();
        inner.deserializer().close();
    }

}

This is the error I'm getting at the commented lines

Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>

I need to define the Serde class for the GenericData.Record (and not for a specific POJO of mine) because I can have different object structures on the same channel, so the deserializer should return me the GenericData (and I will populate the right POJOs after this step).

How would you make things done? Thank you



Solution 1:[1]

You already asked that question in the Confluent mailing list. Here is the summary of my answer I posted there.

We just finished the work on an official Confluent Avro serde (specific Avro + generic Avro) for Kafka Streams. See https://github.com/confluentinc/schema-registry/tree/master/avro-serde.

The new Avro serde, which is Confluent schema registry aware/compatible, will be released with upcoming Confluent 3.3, which is a few weeks out.

Until 3.3 is released, you can either build your own artifacts from the master branch (you must first build the master branches of confluentinc/common and confluentinc/rest-utils with mvn install, then the schema-registry project with mvn install), or e.g. copy-paste the classes into your own code project.

Note: The master branch in the projects above and below are development branches, i.e. pre-release branches. Future readers of this answer should keep this in mind.

We have also examples on how to use the new, upcoming Confluent Avro serde. You can find the demos in the master branch of https://github.com/confluentinc/examples.

Solution 2:[2]

Ok, I think I did. I followed this example

https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/TopArticlesLambdaExample.java

I used the GenericAvroSerde class which generated GenericRecord objects, which I can then work with.

I hope this will be useful for other people.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 gvdm