Consider the following set of documents :
{transaction: "A", date: "1999-10-27" }
{transaction: "B", date: "1999-10-26" }
{transaction: "C", date: "1999-10-26" }
{transaction: "A", date: "1999-10-27" }
{transaction: "A", date: "1999-12-25" }
{transaction: "B", date: "2000-10-25" }
I'm trying to add a filter that would only select the last document for each transaction, based of the value of the field date and obtain the following documents :
{transaction: "C", date: "1999-10-26" }
{transaction: "A", date: "1999-12-25" }
{transaction: "B", date: "2000-10-25" }
Also, a term aggregation wouldn't work because I also need to make a histogram aggregation (by year) on the resulting documents
{
1999: 2
}, {
2000: 1
}
CodePudding user response:
There are two ways to achieve what you need.
A. Using the collapse feature
GET test/_search
{
"_source": false,
"query": {
"match_all": {}
},
"collapse": {
"field": "transaction",
"inner_hits": [
{
"name": "latest",
"size": 1,
"sort": [
{
"date": {
"order": "desc"
}
}
]
}
]
}
}
B. Using a terms aggregation on the transaction field a top_hits sorted on date for the latest transaction
GET test/_search
{
"query": {
"match_all": {}
},
"aggs": {
"transactions": {
"terms": {
"field": "transaction"
},
"aggs": {
"latest": {
"top_hits": {
"size": 1,
"sort": [
{
"date": {
"order": "desc"
}
}
]
}
}
}
}
}
}
Using the above query, it is then trivial to figure out a date histogram in your application logic.
UPDATE:
If you really want ES to build that date histogram for you, you can achieve this by leveraging the scripted_metric aggregation and building the aggregation logic yourself. Note that this solution uses scripting and it might impair the performance of your cluster depending on the volume of your data.
POST test/_search
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"years": {
"scripted_metric": {
"init_script": "state.latest = [:]",
"map_script": """
// 1. record the latest year for each transaction
def key = doc['transaction.keyword'].value;
if (!state.latest.containsKey(key)) {
state.latest[key] = 0;
}
def year = doc['date'].value.getYear();
if (state.latest[key] < year) {
state.latest[key] = year;
}
""",
"combine_script": """
return state.latest
""",
"reduce_script": """
// 2. count how many documents per "latest" year
def years = [:];
states.stream().forEach(shardState -> {
shardState.keySet().stream().forEach(transaction -> {
def year = shardState[transaction].toString();
if (!years.containsKey(year)) {
years[year] = 0;
}
years[year] ;
});
});
return years;
"""
}
}
}
}
For instance, the above aggregation query would work and return the following, which is pretty much what you expect:
"aggregations" : {
"years" : {
"value" : {
"2000" : 1,
"1999" : 2
}
}
}
