'Kafka consumer unit test with Avro Schema registry failing
I'm writing a consumer which listens to a Kafka topic and consumes message whenever message is available. I've tested the logic/code by running Kafka locally and it's working fine.
While writing the unit/component test cases, it's failing with avro schema registry url error. I've tried different options available on internet but could not find anything working. I am not sure if my approach is even correct. Please help.
Listener Class
@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
try {
GenericRecord generic = consumerRecord.value();
Object obj = generic.get("metadata");
ObjectMapper mapper = new ObjectMapper();
Header headerMetaData = mapper.readValue(obj.toString(), Header.class);
System.out.println("Received payload : " + consumerRecord.value());
//Call backend with details in GenericRecord
}catch (Exception e){
System.out.println("Exception while reading message from Kafka " + e );
}
Kafka config
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(genericConsumerFactory());
return factory;
}
public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
return new DefaultKafkaConsumerFactory<>(config);
}
Avro Schema
{
"type":"record",
"name":"KafkaEvent",
"namespace":"com.ms.model.avro",
"fields":[
{
"name":"metadata",
"type":{
"name":"metadata",
"type":"record",
"fields":[
{
"name":"correlationid",
"type":"string",
"doc":"this is corrleation id for transaction"
},
{
"name":"subject",
"type":"string",
"doc":"this is subject for transaction"
},
{
"name":"version",
"type":"string",
"doc":"this is version for transaction"
}
]
}
},
{
"name":"name",
"type":"string"
},
{
"name":"dept",
"type":"string"
},
{
"name":"empnumber",
"type":"string"
}
]
}
This is my test code which I tried...
@ComponentTest
@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "positionmgmt.v1" })
@SpringBootTest(classes={Application.class})
@DirtiesContext
public class ConsumeKafkaMessageTest {
private static final String TEST_TOPIC = "positionmgmt.v1";
@Autowired(required=true)
EmbeddedKafkaBroker embeddedKafkaBroker;
private Schema schema;
private SchemaRegistryClient schemaRegistry;
private KafkaAvroSerializer avroSerializer;
private KafkaAvroDeserializer avroDeserializer;
private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";
private String avroSchema = string representation of avro schema
@BeforeEach
public void setUp() throws Exception {
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(avroSchema);
mockSchemaRegistryClient.register("Vendors-value", schema);
}
@Test
public void consumeKafkaMessage_receive_sucess() {
Schema metadataSchema = schema.getField("metadata").schema();
GenericRecord metadata = new GenericData.Record(metadataSchema);
metadata.put("version", "1.0");
metadata.put("correlationid", "correlationid");
metadata.put("subject", "metadata");
GenericRecord record = new GenericData.Record(schema);
record.put("metadata", metadata);
record.put("name", "ABC");
record.put("dept", "XYZ");
Consumer<String, GenericRecord> consumer = configureConsumer();
Producer<String, GenericRecord> producer = configureProducer();
ProducerRecord<String, GenericRecord> prodRecord = new ProducerRecord<String, GenericRecord>(TEST_TOPIC, record);
producer.send(prodRecord);
ConsumerRecord<String, GenericRecord> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
assertNotNull(singleRecord.value());
consumer.close();
producer.close();
}
private Consumer<String, GenericRecord> configureConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupid", "true", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, GenericRecord> consumer = new DefaultKafkaConsumerFactory<String, GenericRecord>(consumerProps).createConsumer();
consumer.subscribe(Collections.singleton(TEST_TOPIC));
return consumer;
}
private Producer<String, GenericRecord> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, mockSchemaRegistryClient);
producerProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
return new DefaultKafkaProducerFactory<String, GenericRecord>(producerProps).createProducer();
}
}
Error
component.com.ms.listener.ConsumeKafkaMessageTest > consumeKafkaMessage_receive_sucess() FAILED
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:318)
at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:305)
at component.com.ms.listener.ConsumeKafkaMessageTest.configureProducer(ConsumeKafkaMessageTest.java:125)
at component.com.ms.listener.ConsumeKafkaMessageTest.consumeKafkaMessage_receive_sucess(ConsumeKafkaMessageTest.java:97)
Caused by:
io.confluent.common.config.ConfigException: Invalid value io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient@20751870 for configuration schema.registry.url: Expected a comma separated list.
at io.confluent.common.config.ConfigDef.parseType(ConfigDef.java:345)
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:249)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:105)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
... 5 more
Solution 1:[1]
I investigated it a bit and I found out that the problem is in the CashedSchemaRegistryClient that is used by the KafkaAvroSerializer/Deserializer. It is used to fetch the schema definitions from the Confluent Schema Registry.
You already have your schema definition locally so you don't need to go to Schema Registry for them. (at least in your tests)
I had a similar problem and I solved it by creating a custom KafkaAvroSerializer/KafkaAvroDeserializer.
This is a sample of KafkaAvroSerializer. It is rather simple. You just need to extend the provided KafkaAvroSerializer and tell him to use MockSchemaRegistryClient.
public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
public CustomKafkaAvroSerializer() {
super();
super.schemaRegistry = new MockSchemaRegistryClient();
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
super(new MockSchemaRegistryClient());
}
public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
super(new MockSchemaRegistryClient(), props);
}
}
This is a sample of KafkaAvroDeserializer. When the deserialize method is called you need to tell him which schema to use.
public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized Schema getById(int id) {
return schema$;
}
};
}
}
The last step is to tell spring to use created Serializer/Deserializer
spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id
spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest
spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true
I wrote a short blog post about that: https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1?source=friends_link&sk=e55f73b86504e9f577e259181c8d0e23
Link to the working sample project: https://github.com/ivlahek/kafka-avro-without-registry
Solution 2:[2]
The answer from @ivlahek is working, but if you look at this example 3 year later you might want to do slight modification to CustomKafkaAvroDeserializer
private static SchemaRegistryClient getMockClient(final Schema schema) {
return new MockSchemaRegistryClient() {
@Override
public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
throws IOException, RestClientException {
return new AvroSchema(schema);
}
};
}
Solution 3:[3]
As the error says, you need to provide a string to the registry in the producer config, not an object.
Since you're using the Mock class, that string could be anything...
However, you'll need to construct the serializers given the registry instance
Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
// make config map with ("schema.registry.url", "unused")
serializer.configure(config, false);
Otherwise, it will try to create a non-mocked client
And put that into the properties
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
Solution 4:[4]
If your @KafkaListener is in test class then you can read it in StringDeserializer then convert it to the desired class manually
@Autowired
private MyKafkaAvroDeserializer myKafkaAvroDeserializer;
@KafkaListener( topics = "test")
public void inputData(ConsumerRecord<?, ?> consumerRecord) {
log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());
GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));
Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
@Override
public Object deserialize(String topic, byte[] bytes) {
this.schemaRegistry = getMockClient(Myclass.SCHEMA$);
return super.deserialize(topic, bytes);
}
private static SchemaRegistryClient getMockClient(final Schema schema$) {
return new MockSchemaRegistryClient() {
@Override
public synchronized org.apache.avro.Schema getById(int id) {
return schema$;
}
};
}
}
Remember to add schema registry and key/value serializer in application.yml although it won't be used
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url :http://localhost:8080
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | Yuliya Polyeno |
Solution 3 | |
Solution 4 |