'How to write a KafkaAvro Serde for GenericData.Record
I'm using Kafka 0.10.2 and Avro for the serialization of my messages, both for the key and for the value data.
Now I would like to use Kafka Streams but I'm stuck trying to write the Serde
class for the GenericData.Record
class.
import org.apache.avro.generic.GenericData.Record;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
[...]
public final class KafkaAvroSerde implements Serde<Record> {
private final Serde<Record> inner;
public KafkaAvroSerde() {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(), new KafkaAvroDeserializer());
}
public KafkaAvroSerde(SchemaRegistryClient client) {
this(client, Collections.emptyMap());
}
public KafkaAvroSerde(SchemaRegistryClient client, Map<String, ?> props) {
// Here I get the error
inner = Serdes.serdeFrom(new KafkaAvroSerializer(client, props), new KafkaAvroDeserializer(client, props));
}
@Override
public Serializer<Record> serializer() {
return inner.serializer();
}
@Override
public Deserializer<Record> deserializer() {
return inner.deserializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
This is the error I'm getting at the commented lines
Type mismatch: cannot convert from Serde<Object> to Serde<GenericData.Record>
I need to define the Serde class for the GenericData.Record
(and not for a specific POJO of mine) because I can have different object structures on the same channel, so the deserializer should return me the GenericData
(and I will populate the right POJOs after this step).
How would you make things done? Thank you
Solution 1:[1]
You already asked that question in the Confluent mailing list. Here is the summary of my answer I posted there.
We just finished the work on an official Confluent Avro serde (specific Avro + generic Avro) for Kafka Streams. See https://github.com/confluentinc/schema-registry/tree/master/avro-serde.
The new Avro serde, which is Confluent schema registry aware/compatible, will be released with upcoming Confluent 3.3, which is a few weeks out.
Until 3.3 is released, you can either build your own artifacts from the master
branch (you must first build the master
branches of confluentinc/common and confluentinc/rest-utils with mvn install
, then the schema-registry project with mvn install
), or e.g. copy-paste the classes into your own code project.
Note: The
master
branch in the projects above and below are development branches, i.e. pre-release branches. Future readers of this answer should keep this in mind.
We have also examples on how to use the new, upcoming Confluent Avro serde. You can find the demos in the master
branch of https://github.com/confluentinc/examples.
Solution 2:[2]
Ok, I think I did. I followed this example
I used the GenericAvroSerde class which generated GenericRecord objects, which I can then work with.
I hope this will be useful for other people.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | gvdm |