'Deserialize Avro from kafka as SpecificRecord Failing. Expecting type to be a PojoTypeInfo
I am using Flink v1.11.2 and Avro v1.10.1.
I am trying to deserialize an Avro record as a Specific record from a Kafka topic, but for some reason keep getting this error:
I was able to output it as a generic record using this:
FlinkKafkaConsumer<GenericRecord> eventsConsumer = new FlinkKafkaConsumer(
fsiProcessorProps.getKafkaEventsInput(),
AvroDeserializationSchema.forGeneric(Sde.getClassSchema()),
properties);
And am now trying:
FlinkKafkaConsumer<Sde> eventsConsumer = new FlinkKafkaConsumer(
fsiProcessorProps.getKafkaEventsInput(),
AvroDeserializationSchema.forSpecific(Sde.class),
properties);
but get the following error when trying to start the job:
"org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
... 7 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Expecting type to be a PojoTypeInfo
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
... 9 more
Caused by: java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:74)
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:56)
at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:168)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.getProducedType(KafkaDeserializationSchemaWrapper.java:66)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.getProducedType(FlinkKafkaConsumerBase.java:1066)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2172)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1608)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1569)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1551)
at FsiProcessor.main(FsiProcessor.java:46)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 12 more
"
The Sde class is a typical generated avro class so I am not sure what I am missing. Also the data does not have the schema embedded (tried with schema embedded as well and got same issue).
Solution 1:[1]
So I wasn't able to find the answer to that because things switched to using a schema registry, but wanted to post how it was done with a schema registry at least.
So instead of doing:
FlinkKafkaConsumer<Sde> eventsConsumer = new FlinkKafkaConsumer(
fsiProcessorProps.getKafkaEventsInput(),
AvroDeserializationSchema.forSpecific(Sde.class),
properties);
It was changed to:
private static <T extends SpecificRecordBase> FlinkKafkaConsumerBase<T> getFlinkKafkaConsumer(
Properties properties, String topic, Class<? extends SpecificRecordBase> type, String schemaRegistryUrl, Map<String, String> sslConfiguration) {
ConfluentRegistryAvroDeserializationSchema<?> schema =
ConfluentRegistryAvroDeserializationSchema.forSpecific(
type, schemaRegistryUrl, sslConfiguration);
return new FlinkKafkaConsumer<T>(topic, (DeserializationSchema<T>) schema, properties);
}
I won't mark this as an answer, but wanted to give it as at least another way involving schema registry which is a good thing to use with Avro anyway.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | Jicaar |