'How to detect logstash input connection error

How can I monitor and detect errors when connecting kafka to logstash.

Say for example my kafka broker is down and no connection is established between kafka and logstash.

Is there a way in to get the monitor the connection status between logstash and kafka? I can query logstash logs (but I don't think it is the appropriate way) and I tried to use logstash monitoring API (for example localhost:9600/_node/stats/pipelines?pretty) but no api gives me the connection status is off

Thank you in advance



Solution 1:[1]

If you have an elastic agent or a metricbeat agent installed on the Kafka node, you can configure the agent to monitor them using their Kafka specific module.

For getting the connection status from logstash as you mentioned, you can also configure your logstash config to grab the status from the log message.

Sample document in elasticsearch :

{
    "_index": "topicname",
    "_type": "_doc",
    "_id": "ulF8uH0BK9MbBSR7DPEw",
    "_version": 1,
    "_score": null,
    "fields": {
      "@timestamp": [
        "2022-05-09T10:27:56.956Z"
            ],
        "@version": [
            "1"
        ],
        "@version.keyword": [
            "1"
        ],
        "message": [
            "{\"requestMethod\":\"GET\",\"headers\":{\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.XX.XX\",\"accept\":\"*/*\",\"postman-token\":\"11224442345223\",\"host\":\"localhost:2300\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\"},\"body\":{\"category\":\"CAT\",\"noise\":\"purr\"},\"query\":{},\"requestUrl\":\"http://localhost:2300/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.XX.X\",\"statusCode\":200,\"response\":{\"success\":true,\"message\":\"Kafka Details are added\",\"data\":{\"kafkaData\":{\"_id\":\"12gvsddwqbwrfteacr313rcet5\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0},\"postData\":{\"category\":\"DOG\",\"noise\":\"bark\"}}},\"latency\":{\"seconds\":0,\"nanos\":61000000},\"responseSize\":193}"
        ]
    } }

Below configuration can be added to fetch the status:

  input {
    kafka {
      topics => ["topicname"]
      bootstrap_servers => "11.11.11.11:1111"
    }
  }
  
  filter{
        mutate { add_field => { "StatusCode" => "%{[message][0][status]}" } }
  }
  
  output {
   elasticsearch {
      hosts => ["11.11.11.12:9200"]
      index => "topic-name-index"
   }
  }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1