'Deserialize Avro from kafka as SpecificRecord Failing. Expecting type to be a PojoTypeInfo

I am using Flink v1.11.2 and Avro v1.10.1.

I am trying to deserialize an Avro record as a Specific record from a Kafka topic, but for some reason keep getting this error:

I was able to output it as a generic record using this:

FlinkKafkaConsumer<GenericRecord> eventsConsumer = new FlinkKafkaConsumer(
            fsiProcessorProps.getKafkaEventsInput(),
            AvroDeserializationSchema.forGeneric(Sde.getClassSchema()),
            properties);

And am now trying:

FlinkKafkaConsumer<Sde> eventsConsumer = new FlinkKafkaConsumer(
                fsiProcessorProps.getKafkaEventsInput(),
                AvroDeserializationSchema.forSpecific(Sde.class),
                properties);

but get the following error when trying to start the job:

"org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)
    ... 7 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Expecting type to be a PojoTypeInfo
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)
    ... 9 more
Caused by: java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
    at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:74)
    at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:56)
    at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:168)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.getProducedType(KafkaDeserializationSchemaWrapper.java:66)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.getProducedType(FlinkKafkaConsumerBase.java:1066)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2172)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1608)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1569)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1551)
    at FsiProcessor.main(FsiProcessor.java:46)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
    ... 12 more
"

The Sde class is a typical generated avro class so I am not sure what I am missing. Also the data does not have the schema embedded (tried with schema embedded as well and got same issue).



Solution 1:[1]

So I wasn't able to find the answer to that because things switched to using a schema registry, but wanted to post how it was done with a schema registry at least.

So instead of doing:

FlinkKafkaConsumer<Sde> eventsConsumer = new FlinkKafkaConsumer(
            fsiProcessorProps.getKafkaEventsInput(),
            AvroDeserializationSchema.forSpecific(Sde.class),
            properties);

It was changed to:

private static <T extends SpecificRecordBase> FlinkKafkaConsumerBase<T> getFlinkKafkaConsumer(
  Properties properties, String topic, Class<? extends SpecificRecordBase> type, String schemaRegistryUrl, Map<String, String> sslConfiguration) {
  ConfluentRegistryAvroDeserializationSchema<?> schema =
    ConfluentRegistryAvroDeserializationSchema.forSpecific(
        type, schemaRegistryUrl, sslConfiguration);
return new FlinkKafkaConsumer<T>(topic, (DeserializationSchema<T>) schema, properties);
}

I won't mark this as an answer, but wanted to give it as at least another way involving schema registry which is a good thing to use with Avro anyway.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Jicaar