'Unable to send Kafka Avro Message to Message Channel <Failed to convert Generic Message to Outbound Message>

We are trying to push a Kafka notification to the external Kafka Topic by sending the Avro Schema Message to the Message Channel.

On sending the message to the channel, we are getting the below exception:

    Failed to send Message to channel 'DemoChannel'; 
        nested exception is java.lang.IllegalStateException: 
            Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]' to outbound message.
            org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'DemoChannel'; nested exception is java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"location":"US"}, headers={id=46cf666d-647f-36b7-f43c-259621b78842, contentType=avro/bytes, timestamp=1651571094238}]
            at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:167)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
            at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)

Below is the configuration for the Kafka Topic and Message channel in application.yml file of the Spring Boot service.

    cloud:
        stream:
          bindings:
            DemoChannel:
              destination: demoTest
              content-type: avro/bytes  
          kafka:
           binder:
            replication-factor: 1
            brokers: ${broker-ip-and-port}
            zkNodes: ${zookeeper-ip-and-port}
            autoCreateTopics: false
            zkConnectionTimeout: 36000

Below is the Message Channel class file:

             import org.springframework.cloud.stream.annotation.Output;
             import org.springframework.messaging.MessageChannel;
    
             public interface CustomDemoChannel {
                  @Output("DemoChannel")
                  MessageChannel customDemoChannel();
             }

Below is the Producer code trying to send the Avro Message to the Message channel

             //initialized by the autowired CustomDemoChannel variable
             MessageChannel messageChannel ; 
             
             //DemoChannel is the Avro Generated class file based on the Avro schema file
             //avroSchemaObject is constructed and initialized by the inner Builder class of the Avro generated DemoChannel class
             DemoChannel avroSchemaObject; 
             
             //Message to be published is built with payload
             Message<DemoChannel> message = MessageBuilder.withPayload(avroSchemaObject).build();
             
             //Sending the message to the message channel
             messageChannel.send(message);

How can I resolve this exception?



Solution 1:[1]

Thanks Soby!

The issue was resolved by making an additional change: i.e defining an AvroSchemaConverter and including this as a resource in the existing Kafka Producer class.

Step 1: Define your AvroSchemaConverter

@Configuration
public class AvroNoSchemaRegistryConfiguration {
    public static final String CONVERTER_DEMO = "DemoConverter";
    @Bean(name = CONVERTER_DEMO)
    public AvroSchemaMessageConverter demoConverter() {
        AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
        return converter;
    }
}

Step 2: Use the AvroSchemaConverter as a resource in your producer class file like below:

@Resource(name = AvroNoSchemaRegistryConfiguration.CONVERTER_DEMO)
private AvroSchemaMessageConverter converterDemo;

The resource will be autowired by the Spring Cloud Stream jars and the message gets converted into the Avro format at runtime allowing the message to get published to the topic.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 durgapmenon