'ElasticSearch Aggregation over Top Hits

I have data as following:

{"action":"CREATE","docs":1,"date":"2016 Jun 26 12:00:12","userid":"1234"}
{"action":"REPLACE","docs":2,"date":"2016 Jun 27 12:00:12","userid":"1234"}
{"action":"REPLACE","docs":1,"date":"2016 Jun 27 13:00:12","userid":"1234"}
{"action":"CREATE","docs":1,"date":"2016 Jun 28 12:00:12","userid":"3431"}
{"action":"REPLACE","docs":2,"date":"2016 Jun 28 13:00:12","userid":"3431"}
{"action":"CREATE","docs":1,"date":"2016 Jun 29 12:00:12","userid":"9999"}

To get records for each unique user order by date(descending), I used Top Hits like the one below:

"aggs": {
  "user_bucket": {
    "terms": {
      "field": "userid"
    },
    "aggs": {
      "user_latest_count": {
        "top_hits": {
          "size": 1,
          "sort": [
            {
              "data": {
                "order": "desc"
              }
            }
          ],
          "_source": {
            "include": [
              "docs"
            ]
          }
        }
      }
    }
  }
}

The result of above query is as following:

{"action":"REPLACE","docs":1,"date":"2016 Jun 27 13:00:12","userid":"1234"}
{"action":"REPLACE","docs":2,"date":"2016 Jun 28 13:00:12","userid":"3431"}
{"action":"CREATE","docs":1,"date":"2016 Jun 29 12:00:12","userid":"9999"}

Now, I want to aggregate this further so that the result is as following:

{"sum_of_different_buckets": 4}

But not sure how to SUM the field "docs" value from the result obtained above.



Solution 1:[1]

You can also nest aggregations inside aggregations arbitrarily to extract summarized data that you require from your data. May be below sample works.

"aggs" : {
    "sum_of_different_buckets" : { "sum" : { "field" : "docs" } }
}

Solution 2:[2]

You can have other aggregation on a parallel level of top_hit but you cannot have any sub_aggregation below top_hit. It is not supported by elasticsearch. here is the link to github issue

But if you want to have sum at the same level, you may use the approach below.

"aggs": {
    "top_hits_agg": {
        "top_hits": {
            "size": 10,
            "_source": {
              "includes": ["docs"]
            }
        }
    },
    "sum_agg": {
        "sum": {
            "field": "docs"
        }
    }
}

Solution 3:[3]

You can use scripted_metric and sum_bucket pipeline aggregations. The scripted_metric aggregation allows you write your own map-reduce logic and hence you can return a single metric for each term.

POST rahul_test/_search
{
  "size": 0,
  "aggs": {
    "user_bucket": {
      "terms": {
        "field": "userid",
        "size": 10000,
        "min_doc_count": 1
      },
      "aggs": {
        "user_latest_count": {
          "scripted_metric": {
            "init_script": "state.timestamp_latest = 0L; state.last_value = 0",
            "map_script": "def date_as_millis = doc['date'].getValue().toInstant().toEpochMilli(); if (date_as_millis > state.timestamp_latest) { state.timestamp_latest = date_as_millis; state.last_value = doc.docs.value;}",
            "combine_script": "return state",
            "reduce_script": "def last_value = 0; def timestamp_latest = 0L; for (s in states) {if (s.timestamp_latest > (timestamp_latest)) {timestamp_latest = s.timestamp_latest; last_value = s.last_value;}} return last_value;"
          }
        }
      }
    },
    "sum_user_latest_counts": {
      "sum_bucket": {
        "buckets_path": "user_bucket>user_latest_count.value"
      }
    }
  }
}
  • The init_script creates 2 fields timestamp_latest and last_value in the state object (one state object for each shard).
  • The map_script is executed once per document collected in the buckets returned by parent terms aggregation. If defines date_as_millis based on the date of the document, then compares date_as_millis with state.timestamp_latest, finally updates state.last_value from the shard.
  • The combine_script returns state from each shard.
  • The reduce_script iterates through the value of s.timestamp_latest returned by each shard and returns a single value from the document with the latest timestamp (last_value).

At this point, we have the latest docs value for each userid. Then we use a sum_bucket pipeline aggregation in order to sum all latest docs values, which returns the value of 4.

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 6,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user_bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "1234",
          "doc_count" : 3,
          "user_latest_count" : {
            "value" : 1
          }
        },
        {
          "key" : "3431",
          "doc_count" : 2,
          "user_latest_count" : {
            "value" : 2
          }
        },
        {
          "key" : "9999",
          "doc_count" : 1,
          "user_latest_count" : {
            "value" : 1
          }
        }
      ]
    },
    "sum_user_latest_counts" : {
      "value" : 4.0
    }
  }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alp
Solution 2 Saket Gupta
Solution 3 Rahul Singhai