'Can one Kafka Producer Class have multiple @EventListener methods?

Im using Spring Kafka and wrote Producer Class

@Component
@RequiredArgsConstructor
class Producer {

    private static final String TOPIC = "channels";
    private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @EventListener(ApplicationStartedEvent.class)
    public void channels_01() throws IOException {

    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_02()throws IOException {
        
    }

    @EventListener(ApplicationStartedEvent.class)
    public void channels_03()throws IOException {
        
    }
}

Is it possible to run 3 @Eventlistener annotated methods as producer simutaneouly? 3 methods are sending records to exactly same topic. Will spring container and kafka server recognize them as 3 separate producer clients?



Solution 1:[1]

If you want to have three producer clients, then you need to have three KafkaTemplate and use a producerPerThread = true option of the DefaultKafkaProducerFactory:

/**
 * Set to true to create a producer per thread instead of singleton that is shared by
 * all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
 * physically close the producer when it is no longer needed. These producers will not
 * be closed by {@link #destroy()} or {@link #reset()}.
 * @param producerPerThread true for a producer per thread.
 * @since 2.3
 * @see #closeThreadBoundProducer()
 */
public void setProducerPerThread(boolean producerPerThread) {

Then you need to ensure that ApplicationEventMulticaster is asynchronous. See SimpleApplicationEventMulticaster:

/**
 * Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})
 * to invoke each listener with.
 * <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},
 * executing all listeners synchronously in the calling thread.
 * <p>Consider specifying an asynchronous task executor here to not block the
 * caller until all listeners have been executed. However, note that asynchronous
 * execution will not participate in the caller's thread context (class loader,
 * transaction association) unless the TaskExecutor explicitly supports this.
 * @see org.springframework.core.task.SyncTaskExecutor
 * @see org.springframework.core.task.SimpleAsyncTaskExecutor
 */
public void setTaskExecutor(@Nullable Executor taskExecutor) {

Register a bean with name applicationEventMulticaster for it and set a desired TaskExecutor to ensure that your @EventListener methods are called in parallel.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Artem Bilan