'Transform Stream

I have a GenericRecord stream with value deserialised using Avro, schema has name and age.

 KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("sucheth1")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
 .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://localhost:8081"))
                .build();

 DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");


I'm trying to map each record and add a new field 'location' to the stream and I get error saying

Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
    at org.apache.avro.generic.GenericData$Record.put(GenericData.java:242)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location

Is there a way I can transform the stream by adding new fields?



Solution 1:[1]

The issue here is Avro, not Flink. Your transformation (the map function) will need to emit records that use a different schema (one that includes the new field).

Extend Avro schema via Java API by adding one field may provide some helpful insight.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 David Anderson