'Check Kafka client connection status at runtime
I built a Java Kakfa client using the library Apache Kafka 2.8.0. Sometime, when the Kafka server become unreachable for some reasons, the Kafka log keeps to print over and over again (approximately every 25 seconds), this WARN:
[WARN ][NetworkClient] - [[Consumer clientId=ROMLAPA275, groupId=aws-group-internal] Connection to node -1 (serverhost.internal.net/10.95.12.12:9092) could not be established. Broker may not be available.]
[WARN ][NetworkClient$DefaultMetadataUpdater] - [[Consumer clientId=ROMLAPA275, groupId=aws-group-internal] Bootstrap broker serverhost.internal.net:9092 (id: -1 rack: null) disconnected]
It isn't an Exception, so I cannot catch it. Is there a way to check the connection status in order to do something when the client cannot reach the Kafka server?
I tried to get something in debug and I found the connect status in this path:
and I think it means there is a wrong status, but I can reach it in debug.
Anyone know how can I do this? Thanks!
Below the code.
This is the Kafka client class, as a Thread:
public KafkaConsumerLoop(int id, String groupId, List<String> topics, String serverConfig, Context context, String clientId) {
this.topics = topics;
this.context = context;
System.out.println("Instanciate Kafka consumer:");
System.out.println(" - server: "+serverConfig);
System.out.println(" - topic: "+this.topics);
System.out.println(" - clientId: "+clientId);
System.out.println(" - groupId: "+groupId);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverConfig);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
this.consumer = new KafkaConsumer<>(props);
System.out.println("Kafka consumer instanciated");
}
I run it in another class:
/** Create and run consumer as a Thread*/
private void consumer() {
Context context = (Context)ContextStore.get().clone();
context.setProject(this.getProject());
kafkaConsumerLoop = new KafkaConsumerLoop(
1,
this.groupId,
Arrays.asList(topicConsumer),
this.bootstrapServersConfig,
context,
CLIENT_ID);
kafkaConsumerThread = new Thread(kafkaConsumerLoop);
kafkaConsumerThread.setName("consumer_thread_"+kafkaConsumerThread.getId());
kafkaConsumerThread.start();
}
as a loop:
@Override
public void run() {
try {
this.consumer.subscribe(topics);
System.out.println("Consumer running on topic: "+topics);
System.out.println("Current consumer thread name - "+Thread.currentThread().getName()
+" - ID: "+Thread.currentThread().getId());
ConsumerRecords<String, String> records = null;
while (!closed.get()) {
records = consumer.poll(Duration.ofMillis(100));
records: for (ConsumerRecord<String, String> record : records) {
System.out.println("Current consumer thread name - "+Thread.currentThread().getName()
+" - ID: "+Thread.currentThread().getId());
String clientId = "";
// If the header contains the own producer Client ID, skip this message
for (Header header : record.headers()) {
if(header.key().equals(KafkaClientCommunication.CLIENT_ID_HEADER_KEY))
if(new String(header.value()).equals(InetAddress.getLocalHost().getHostName())) {
BmsCommon.LOG.debug("Received message from us...");
continue records;
} else {
clientId = new String(header.value());
System.out.println("Received message from client-id: "+new String(header.value()));
}
}
...
Thanks.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|