'Transform Stream
I have a GenericRecord stream with value deserialised using Avro, schema has name and age.
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setTopics("sucheth1")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://localhost:8081"))
.build();
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
I'm trying to map each record and add a new field 'location' to the stream and I get error saying
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
at org.apache.avro.generic.GenericData$Record.put(GenericData.java:242)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: location
Is there a way I can transform the stream by adding new fields?
Solution 1:[1]
The issue here is Avro, not Flink. Your transformation (the map function) will need to emit records that use a different schema (one that includes the new field).
Extend Avro schema via Java API by adding one field may provide some helpful insight.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | David Anderson |