'This error handler cannot process 'SerializationException's directly;
The image below is my topic. Every once in a while, a value other than an empty or json is returned.
If it is null, it throws an error as below and enters the loop.
2022-04-28 17:07:07.007 ERROR org.spr.kaf.lis.KafkaMessageListenerContainer$ListenerConsumer.error:149: Consumer exception [consumeNotificationSender_175303] java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.3.jar:2.7.3] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.3.jar:2.7.3]
Consumer configs,
protected Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, payloadClass);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, payloadClass.getPackage());
return props;
}
protected ConsumerFactory<String, T> consumerFactory() {
var deserializer = new JsonDeserializer<>(payloadClass);
deserializer.addTrustedPackages("*");
deserializer.setTypeMapper(new DefaultJackson2JavaTypeMapper());
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), deserializer);
}
Procuder configs,
private KafkaTemplate<K, V> createProducerKafkaTemplate(final String bootstrapServers, final String acks) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
if (acks != null){
configProps.put(ProducerConfig.ACKS_CONFIG, acks);
}
DefaultKafkaProducerFactory<K, V> factory = new DefaultKafkaProducerFactory<>(configProps);
KafkaTemplate<K, V> kTemplate = new KafkaTemplate<>(factory);
kTemplate.setProducerListener(getProducerListener());
return kTemplate;
}
private ProducerListener<K, V> getProducerListener() {
return new ProducerListener<>() {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
ProducerListener.super.onSuccess(producerRecord, recordMetadata);
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
ProducerListener.super.onError(producerRecord, recordMetadata, exception);
log.error("KAFKA onError", exception);
}
};
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|