Home > Back-end >  Elasticsearch - Calculate Delay between Timestamps
Elasticsearch - Calculate Delay between Timestamps

Time:02-01

How can I calculate the delay between timestamps without logstash, but with script_fields?

e.g. for this documents:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03"
 }
}

I want to have a new field called "time_taken", so the expected documents should look like this:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00",
  "time_taken": "1"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01",
  "time_taken": "2"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03"
 }
}

CodePudding user response:

The provided answer was inspired from Painless example in Transforms.

The solution uses Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.

First thing I created a mapping for the provided example:

PUT myindex
{
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "fields": {
          "keywords": {
            "type": "keyword"
          }
        }
      },
      "timestamp": {
        "type": "date"
      }
    }
  }
}

and insert some documents:

POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
  "name": "test1",
  "timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
  "name": "test2",
  "timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
  "name": "test2",
  "timestamp":"2022-01-27T19:36:11Z"
}

Using the Transform API we can calculate for each aggregation the time length for each term:

POST _transform/_preview
{
  "source": {
    "index": "myindex"
  },
  "dest": {
    "index": "destindex"
  },
  "pivot": {
    "group_by": {
      "name": {
        "terms": {
          "field": "name.keywords"
        }
      }
    },
    "aggregations": {
      "latest_value": {
        "scripted_metric": {
          "init_script": "state.timestamp_latest = 0L;",
          "map_script": """
          def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
          if (current_date > state.timestamp_latest)
          {state.timestamp_latest = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_latest = 0L;
          for (s in states) {if (s.timestamp_latest > (timestamp_latest))
          {timestamp_latest = s.timestamp_latest;}}
          return timestamp_latest
        """
        }
      },
      "first_value": {
         "scripted_metric": {
          "init_script": "state.timestamp_first = 999999999999999L;",
          "map_script": """
          def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
          if (current_date < state.timestamp_first)
          {state.timestamp_first = current_date;}
        """,
          "combine_script": "return state",
          "reduce_script": """
          def last_doc = '';
          def timestamp_first = 999999999999999L;
          for (s in states) {if (s.timestamp_first < (timestamp_first))
          {timestamp_first = s.timestamp_first;}}
          return timestamp_first
        """
        }
      },
      "time_length": {
        "bucket_script": {
          "buckets_path": {
            "min": "first_value.value",
            "max": "latest_value.value"
          },
          "script": "(params.max - params.min)/1000"
        }
      }
    }
  }
}

The output is as follow:

{
  "preview" : [
    {
      "time_length" : 300.0,
      "name" : "test1",
      "first_value" : 1643312891000,
      "latest_value" : 1643313191000
    },
    {
      "time_length" : 60.0,
      "name" : "test2",
      "first_value" : 1643312111000,
      "latest_value" : 1643312171000
    }
  ],
  "generated_dest_index" : {
    "mappings" : {
      "_meta" : {
        "_transform" : {
          "transform" : "transform-preview",
          "version" : {
            "created" : "7.15.1"
          },
          "creation_date_in_millis" : 1643400080594
        },
        "created_by" : "transform"
      },
      "properties" : {
        "name" : {
          "type" : "keyword"
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "auto_expand_replicas" : "0-1"
      }
    },
    "aliases" : { }
  }
}

What's the script doing?

As you can see we are creating a term aggregation on the field name.keywords. We used a scripted metric aggregation that has 4 steps:

  • init_script: initiate a state, it's a space where you initialize your variables and their scope are global for all shards
  • map_script: this step execute the code for each document, means you can iterate or do complex calculation on your documents like if you were coding in a high-level programming language like python or java (avoid doing heavy calculation or it will slower your aggregation)
  • combine_script: here we tell elasticsearch to return the state from each shard
  • reduce_script: it's the final step where we iterate over the result of each shard from the previous step (aka combine script) to calculate the first/latest timestamp for each aggregation.

Finally, in the bucket script, we calculate the difference given the first_value and latest_value, we divided by 1000 because the timestamp field is stored in epoch millis. time_length unit is in seconds.

More information about scripted metric aggregation: Scripted metrics.

  •  Tags:  
  • Related