'How to test Kafka OnFailure callback with Junit?

I have the following code to send data to Kafka:

@Service
public class KafkaSender{
    @Autowired
    private KafkaTemplate<String, Employee> kafkaTemplate;
    public void sendMessage(Employee employee) {

        ObjectMapper objectMapper = new ObjectMapper();
        ListenableFuture<SendResult<String, Employee>> listenableFuture = kafkaTemplate.send(topic,employee);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, Employee>>() {
            @Override
            public void onSuccess(SendResult<String, Employee> result) {
                // method to save in DB
                saveInDatabaseMethod(result.getProducerRecord());
            }

            @Override
            public void onFailure(Throwable ex) {
                // class cast exception occur here
                ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();
                saveInDatabaseMethod(producerRecord);
            }
        }
    }
}

I am able to test the OnSucess callback scenario, but i am not able to test the OnFailure one.

@Test
void test() throws InterruptedException, ExecutionException {

    Throwable ex = mock(Throwable.class);
    Employee employee = new Employee();

    when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
    when(sendResult.getProducerRecord()).thenReturn(producerRecord);
    when(producerRecord.value()).thenReturn(employee);

    doAnswer(invocationOnMock -> {
        ListenableFutureCallback<SendResult<String, Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
        listenableFutureCallback.onFailure(ex);
        return null;
    }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
    
    kafkaSender.sendMessage(employee);
}

The above test throws:

java.lang.ClassCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 cannot be cast to org.springframework.kafka.core.KafkaProducerException



Solution 1:[1]

ProducerRecord<String, Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();

Your mock is not calling the callback with a KPE, its calling it with this

Throwable ex = mock(Throwable.class);

You need to wrap it in a KPE.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gary Russell