'How to detect logstash input connection error
How can I monitor and detect errors when connecting kafka to logstash.
Say for example my kafka broker is down and no connection is established between kafka and logstash.
Is there a way in to get the monitor the connection status between logstash and kafka? I can query logstash logs (but I don't think it is the appropriate way) and I tried to use logstash monitoring API (for example localhost:9600/_node/stats/pipelines?pretty) but no api gives me the connection status is off
Thank you in advance
Solution 1:[1]
If you have an elastic agent
or a metricbeat agent
installed on the Kafka node, you can configure the agent to monitor them using their Kafka specific module.
For getting the connection status from logstash as you mentioned, you can also configure your logstash config to grab the status from the log message.
Sample document in elasticsearch :
{
"_index": "topicname",
"_type": "_doc",
"_id": "ulF8uH0BK9MbBSR7DPEw",
"_version": 1,
"_score": null,
"fields": {
"@timestamp": [
"2022-05-09T10:27:56.956Z"
],
"@version": [
"1"
],
"@version.keyword": [
"1"
],
"message": [
"{\"requestMethod\":\"GET\",\"headers\":{\"content-type\":\"application/json\",\"user-agent\":\"PostmanRuntime/7.XX.XX\",\"accept\":\"*/*\",\"postman-token\":\"11224442345223\",\"host\":\"localhost:2300\",\"accept-encoding\":\"gzip, deflate, br\",\"connection\":\"keep-alive\",\"content-length\":\"44\"},\"body\":{\"category\":\"CAT\",\"noise\":\"purr\"},\"query\":{},\"requestUrl\":\"http://localhost:2300/kafka\",\"protocol\":\"HTTP/1.1\",\"remoteIp\":\"1\",\"requestSize\":302,\"userAgent\":\"PostmanRuntime/7.XX.X\",\"statusCode\":200,\"response\":{\"success\":true,\"message\":\"Kafka Details are added\",\"data\":{\"kafkaData\":{\"_id\":\"12gvsddwqbwrfteacr313rcet5\",\"category\":\"DOG\",\"noise\":\"bark\",\"__v\":0},\"postData\":{\"category\":\"DOG\",\"noise\":\"bark\"}}},\"latency\":{\"seconds\":0,\"nanos\":61000000},\"responseSize\":193}"
]
} }
Below configuration can be added to fetch the status:
input {
kafka {
topics => ["topicname"]
bootstrap_servers => "11.11.11.11:1111"
}
}
filter{
mutate { add_field => { "StatusCode" => "%{[message][0][status]}" } }
}
output {
elasticsearch {
hosts => ["11.11.11.12:9200"]
index => "topic-name-index"
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |