'No broker/node available in test with Kafka in TestContainers
I am trying to create a bare-bones skeleton integration test for Kafka with TestContainers: just publish message to topic and check it arrives to it (entire setup below).
SkeletonTests.kt
@Testcontainers
class SkeletonTests {
@Container
private val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
@Test
fun `do nothing special`() {
// Arrange
val producer = KafkaProducer(
mapOf(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers),
StringSerializer(),
StringSerializer()
)
val consumer = KafkaConsumer(
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafka.bootstrapServers,
ConsumerConfig.MAX_POLL_RECORDS_CONFIG to 1,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.GROUP_ID_CONFIG to "test-group-id"
),
StringDeserializer(),
StringDeserializer()
).apply { subscribe(listOf("topic")) }
// Act
producer.send(ProducerRecord("topic", "Hello there!"))
producer.flush()
// Assert
assertEquals(consumer.poll(Duration.ofSeconds(3)).first().value(), "Hello there!")
}
}
build.gradle.kts
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.5.31"
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.apache.kafka:kafka-clients:3.1.0")
implementation("ch.qos.logback:logback-core:1.2.11")
implementation("ch.qos.logback:logback-classic:1.2.11")
implementation("org.slf4j:slf4j-api:1.7.36")
testImplementation("org.junit.jupiter:junit-jupiter:5.8.2")
testImplementation("org.testcontainers:kafka:1.17.1")
testImplementation("org.testcontainers:junit-jupiter:1.17.1")
}
tasks.test {
useJUnitPlatform()
}
tasks.withType<KotlinCompile>() {
kotlinOptions.jvmTarget = "11"
}
logback.xml
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Test passes (ProducerTests > do nothing special() PASSED
) however log is flooded with producer and consumer warnings. Is this expected? Am I missing some configuration for broker/leader to make this errors go away?
Producer:
02:12:11.204 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node localhost:61785 (id: 1 rack: null) for sending metadata request
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:12:11.255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base/java.lang.Thread.run(Thread.java:829)
02:12:11.256 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node 1 disconnected.
02:12:11.256 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.
02:14:39.670 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {topic=LEADER_NOT_AVAILABLE}
02:14:39.670 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Requesting metadata update for topic topic due to error LEADER_NOT_AVAILABLE
Consumer:
02:21:11.351 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374)
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Node 1 disconnected.
02:21:11.352 [kafka-coordinator-heartbeat-thread | test-group-id] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.
02:21:11.353 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] No broker available to send FindCoordinator request
02:21:11.543 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] No broker available to send FindCoordinator request
02:21:11.543 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Give up sending metadata request since no node is available
02:21:11.544 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Sending FindCoordinator request to broker localhost:61785 (id: 1 rack: null)
02:21:11.544 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Initiating connection to node localhost:61785 (id: 1 rack: null) using address localhost/127.0.0.1
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:224)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:526)
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:551)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1374)
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Node 1 disconnected.
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Connection to node 1 (localhost/127.0.0.1:61785) could not be established. Broker may not be available.
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-test-group-id-1, correlationId=14) due to node 1 being disconnected
02:21:11.545 [kafka-coordinator-heartbeat-thread | test-group-id] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] FindCoordinator request failed due to org.apache.kafka.common.errors.DisconnectException
Update: I removed Spring dependencies completely however the problem persists and that suggests I am misconfiguring TestContainers.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|