'This error handler cannot process 'SerializationException's directly;

The image below is my topic. Every once in a while, a value other than an empty or json is returned.

enter image description here

If it is null, it throws an error as below and enters the loop.

2022-04-28 17:07:07.007 ERROR org.spr.kaf.lis.KafkaMessageListenerContainer$ListenerConsumer.error:149: Consumer exception [consumeNotificationSender_175303] java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194) ~[spring-kafka-2.7.3.jar:2.7.3] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.3.jar:2.7.3]

Consumer configs,

protected Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, payloadClass);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, payloadClass.getPackage());
        return props;
    }

    protected ConsumerFactory<String, T> consumerFactory() {
        var deserializer = new JsonDeserializer<>(payloadClass);
        deserializer.addTrustedPackages("*");
        deserializer.setTypeMapper(new DefaultJackson2JavaTypeMapper());
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), deserializer);
    }

Procuder configs,

private KafkaTemplate<K, V> createProducerKafkaTemplate(final String bootstrapServers, final String acks) {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    if (acks != null){
        configProps.put(ProducerConfig.ACKS_CONFIG, acks);
    }
    DefaultKafkaProducerFactory<K, V> factory = new DefaultKafkaProducerFactory<>(configProps);
    KafkaTemplate<K, V> kTemplate = new KafkaTemplate<>(factory);
    kTemplate.setProducerListener(getProducerListener());
    return kTemplate;
}

private ProducerListener<K, V> getProducerListener() {
    return new ProducerListener<>() {
        @Override
        public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
            ProducerListener.super.onSuccess(producerRecord, recordMetadata);

        }
        @Override
        public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
            ProducerListener.super.onError(producerRecord, recordMetadata, exception);
            log.error("KAFKA onError", exception);
        }
    };
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source