'How to reuse added field in output with logstash

My use case is simple. I have a Kafka in input, and some indexes in Elasticsearch (topic name === index name), and where indexes names are the same as the entities we use in our application, like "buildings", "cars", "bus" (just for example).

input {
    kafka {
        bootstrap_servers => "kafka:29092"
        topics => ['cars', 'bus']
        decorate_events => true
        codec => 'json'
    }
}

filter {
    if [@metadata][kafka][topic] == 'cars' {
        mutate {
            convert => {
                "car_id" => "integer"
            }
            add_field => {
                'id' => "%{car_id}"
            }
        }
    }

    if [@metadata][kafka][topic] == 'bus' {
        mutate {
            convert => {
                "bus_id" => "integer"
            }
            add_field => {
                'id' => "%{bus_id}"
            }
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{car_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{car_id}'
        }
    }
}

if [@metadata][kafka][topic] == 'cars' {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${ELASTICSEARCH_USERNAME}"
        password => "${ELASTICSEARCH_PASSWORD}"
        index => "%{[@metadata][kafka][topic]}"
        doc_as_upsert => true
        action => 'update'
        document_id => '%{bus_id}'
    }

    if '%{[isDelete]}' {
        elasticsearch {
            hosts => "elasticsearch:9200"
            user => "${ELASTICSEARCH_USERNAME}"
            password => "${ELASTICSEARCH_PASSWORD}"
            index => "%{[@metadata][kafka][topic]}"
            action => 'delete'
            document_id => '%{bus_id}'
        }
    }
}

To update / delete documents from Logstash, I need to use their id. But you'll understand, I don't want to have 50 conditions for each entities, i prefer factorize.

I would like to re-use the "id" I've created in the filter part, in the output to use it in document_id.

Do you have any idea about how I could do it ?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source