'No broker/node available in test with Kafka in TestContainers

I am trying to create a bare-bones skeleton integration test for Kafka with TestContainers: just publish message to topic and check it arrives to it (entire setup below).

SkeletonTests.kt

@Testcontainers
class SkeletonTests {
  @Container
  private val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

  @Test
  fun `do nothing special`() {
    // Arrange
    val producer = KafkaProducer(
      mapOf(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers),
      StringSerializer(),
      StringSerializer()
    )

    val consumer = KafkaConsumer(
      mapOf(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 1,
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
        ConsumerConfig.GROUP_ID_CONFIG to "test-group-id"
      ),
      StringDeserializer(),
      StringDeserializer()
    ).apply { subscribe(listOf("topic")) }

    // Act
    producer.send(ProducerRecord("topic", "Hello there!"))
    producer.flush()

    // Assert
    assertEquals(consumer.poll(Duration.ofSeconds(3)).first().value(), "Hello there!")
  }
}

build.gradle.kts

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    kotlin("jvm") version "1.5.31"
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.apache.kafka:kafka-clients:3.1.0")
    implementation("ch.qos.logback:logback-core:1.2.11")
    implementation("ch.qos.logback:logback-classic:1.2.11")
    implementation("org.slf4j:slf4j-api:1.7.36")
    testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
    testImplementation("org.testcontainers:kafka:1.17.1")
    testImplementation("org.testcontainers:junit-jupiter:1.17.1")
}

tasks.test {
    useJUnitPlatform()
}

tasks.withType<KotlinCompile>() {
    kotlinOptions.jvmTarget = "11"
}

logback.xml

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
  </appender>
  <root level="debug">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

Test passes (ProducerTests > do nothing special() PASSED) however log is flooded with producer and consumer warnings. Is this expected? Am I missing some configuration for broker/leader to make this errors go away?

Producer:

02:12:11.204 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node localhost:61785 (id: 1 rack: null) for sending metadata request
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.base/java.lang.Thread.run(Thread.java:829)
02:12:11.256 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 1 disconnected.
02:12:11.256 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.

02:14:39.670 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {topic=LEADER_NOT_AVAILABLE}

02:14:39.670 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Requesting metadata update for topic topic due to error LEADER_NOT_AVAILABLE

Consumer:

02:21:11.351 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374)
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Node 1 disconnected.
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.
02:21:11.353 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] No broker available to send FindCoordinator request

02:21:11.543 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] No broker available to send FindCoordinator request
02:21:11.543 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Give up sending metadata request since no node is available
02:21:11.544 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Sending FindCoordinator request to broker localhost:61785 (id: 1 rack: null)
02:21:11.544 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374)
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Node 1 disconnected.
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-test-group-id-1, correlationId=14) due to node 1 being disconnected
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] FindCoordinator request failed due to org.apache.kafka.common.errors.DisconnectException

Update: I removed Spring dependencies completely however the problem persists and that suggests I am misconfiguring TestContainers.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source