'Kafka consumer unit test with Avro Schema registry failing

I'm writing a consumer which listens to a Kafka topic and consumes message whenever message is available. I've tested the logic/code by running Kafka locally and it's working fine.

While writing the unit/component test cases, it's failing with avro schema registry url error. I've tried different options available on internet but could not find anything working. I am not sure if my approach is even correct. Please help.

Listener Class

@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
        try {
            GenericRecord generic = consumerRecord.value();
            Object obj = generic.get("metadata");

            ObjectMapper mapper = new ObjectMapper();

            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);

            System.out.println("Received payload :   " + consumerRecord.value());

            //Call backend with details in GenericRecord 

        }catch (Exception e){
            System.out.println("Exception while reading message from Kafka " + e );
        }

Kafka config

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(genericConsumerFactory());
        return factory;
    }

public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
        return new DefaultKafkaConsumerFactory<>(config);
    }

Avro Schema

{
   "type":"record",
   "name":"KafkaEvent",
   "namespace":"com.ms.model.avro",
   "fields":[
      {
         "name":"metadata",
         "type":{
            "name":"metadata",
            "type":"record",
            "fields":[
               {
                  "name":"correlationid",
                  "type":"string",
                  "doc":"this is corrleation id for transaction"
               },
               {
                  "name":"subject",
                  "type":"string",
                  "doc":"this is subject for transaction"
               },
               {
                  "name":"version",
                  "type":"string",
                  "doc":"this is version for transaction"
               }
            ]
         }
      },
      {
         "name":"name",
         "type":"string"
      },
      {
         "name":"dept",
         "type":"string"
      },
      {
         "name":"empnumber",
         "type":"string"
      }
   ]
}

This is my test code which I tried...

@ComponentTest
    @RunWith(SpringRunner.class)
    @EmbeddedKafka(partitions = 1, topics = { "positionmgmt.v1" })
    @SpringBootTest(classes={Application.class})
    @DirtiesContext
    public class ConsumeKafkaMessageTest {

      private static final String TEST_TOPIC = "positionmgmt.v1";

      @Autowired(required=true)
      EmbeddedKafkaBroker embeddedKafkaBroker;

      private Schema schema;

      private  SchemaRegistryClient schemaRegistry;
      private  KafkaAvroSerializer avroSerializer;
      private  KafkaAvroDeserializer avroDeserializer;

      private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
      private String registryUrl = "unused";

      private String avroSchema = string representation of avro schema

      @BeforeEach
      public void setUp() throws Exception {
        Schema.Parser parser = new Schema.Parser();
        schema = parser.parse(avroSchema);

        mockSchemaRegistryClient.register("Vendors-value", schema);
      }

      @Test
      public void consumeKafkaMessage_receive_sucess() {

        Schema metadataSchema = schema.getField("metadata").schema();
        GenericRecord metadata = new GenericData.Record(metadataSchema);
        metadata.put("version", "1.0");
        metadata.put("correlationid", "correlationid");
        metadata.put("subject", "metadata");

        GenericRecord record = new GenericData.Record(schema);
        record.put("metadata", metadata);
        record.put("name", "ABC");
        record.put("dept", "XYZ");

        Consumer<String, GenericRecord> consumer = configureConsumer();
        Producer<String, GenericRecord> producer = configureProducer();

        ProducerRecord<String, GenericRecord> prodRecord = new ProducerRecord<String, GenericRecord>(TEST_TOPIC, record);

        producer.send(prodRecord);

        ConsumerRecord<String, GenericRecord> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertNotNull(singleRecord.value());

        consumer.close();
        producer.close();

      }

      private Consumer<String, GenericRecord> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupid", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<String, GenericRecord> consumer = new DefaultKafkaConsumerFactory<String, GenericRecord>(consumerProps).createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
      }

      private Producer<String, GenericRecord> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, mockSchemaRegistryClient);
        producerProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
        return new DefaultKafkaProducerFactory<String, GenericRecord>(producerProps).createProducer();
      }

}

Error

component.com.ms.listener.ConsumeKafkaMessageTest > consumeKafkaMessage_receive_sucess() FAILED
    org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:318)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:305)
        at component.com.ms.listener.ConsumeKafkaMessageTest.configureProducer(ConsumeKafkaMessageTest.java:125)
        at component.com.ms.listener.ConsumeKafkaMessageTest.consumeKafkaMessage_receive_sucess(ConsumeKafkaMessageTest.java:97)

        Caused by:
        io.confluent.common.config.ConfigException: Invalid value io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient@20751870 for configuration schema.registry.url: Expected a comma separated list.
            at io.confluent.common.config.ConfigDef.parseType(ConfigDef.java:345)
            at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:249)
            at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
            at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:105)
            at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
            at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
            at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
            at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
            ... 5 more


Solution 1:[1]

I investigated it a bit and I found out that the problem is in the CashedSchemaRegistryClient that is used by the KafkaAvroSerializer/Deserializer. It is used to fetch the schema definitions from the Confluent Schema Registry.

You already have your schema definition locally so you don't need to go to Schema Registry for them. (at least in your tests)

I had a similar problem and I solved it by creating a custom KafkaAvroSerializer/KafkaAvroDeserializer.

This is a sample of KafkaAvroSerializer. It is rather simple. You just need to extend the provided KafkaAvroSerializer and tell him to use MockSchemaRegistryClient.

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public CustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

This is a sample of KafkaAvroDeserializer. When the deserialize method is called you need to tell him which schema to use.

public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);  
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized Schema getById(int id) {
                return schema$;
            }
        };
    }
}

The last step is to tell spring to use created Serializer/Deserializer

spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id

spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest

spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true

I wrote a short blog post about that: https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1?source=friends_link&sk=e55f73b86504e9f577e259181c8d0e23

Link to the working sample project: https://github.com/ivlahek/kafka-avro-without-registry

Solution 2:[2]

The answer from @ivlahek is working, but if you look at this example 3 year later you might want to do slight modification to CustomKafkaAvroDeserializer

private static SchemaRegistryClient getMockClient(final Schema schema) {
        return new MockSchemaRegistryClient() {

     @Override
     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
                    throws IOException, RestClientException {
         return new AvroSchema(schema);
     }            
 };
}

Solution 3:[3]

As the error says, you need to provide a string to the registry in the producer config, not an object.

Since you're using the Mock class, that string could be anything...

However, you'll need to construct the serializers given the registry instance

Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
 // make config map with ("schema.registry.url", "unused") 
serializer.configure(config, false);

Otherwise, it will try to create a non-mocked client

And put that into the properties

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);

Solution 4:[4]

If your @KafkaListener is in test class then you can read it in StringDeserializer then convert it to the desired class manually

    @Autowired
    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;

    @KafkaListener( topics = "test")
    public void inputData(ConsumerRecord<?, ?> consumerRecord) {
        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());

        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));


        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {

            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);

        return super.deserialize(topic, bytes);
    }



    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized org.apache.avro.Schema getById(int id) {
                return schema$;
            }
        };
    }
}

Remember to add schema registry and key/value serializer in application.yml although it won't be used

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url :http://localhost:8080

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Yuliya Polyeno
Solution 3
Solution 4