Elasticsearch: Streamstats function in Elastic ?

Created on 19 Jul 2019  Â·  11Comments  Â·  Source: elastic/elasticsearch

Hi all,

I would like to know if you consider to implement a function streamstats like splunk in Elasticsearch?

I've already asked on the www.elastic.co but I didn't get any answers.

Thanks

:AnalyticAggregations >feature feedback_needed

Most helpful comment

Here is an example scripted metric (variable names may have to change)

      "time_delta_stats": {
        "scripted_metric": {
        "init_script" : "state.timestamps = []", 
        "map_script" : "state.timestamps.add(doc['@timestamp'].value.toInstant().toEpochMilli())",
        "combine_script" : "return state.timestamps",
        "reduce_script" : """
        def all_timestamps = [];
        for (s in states) { 
              for (t in s) { 
                all_timestamps.add(t); 
              }
            }
        all_timestamps.sort(Comparator.naturalOrder());
        def deltas = [];
        for (int i = 0; i < all_timestamps.size() - 1; i++) {
          deltas.add(all_timestamps.get(i+1) - all_timestamps.get(i));
        }
        if (deltas.size() == 0) {
          return null;
        }
        def sum = 0L;
        for(d in deltas) {
          sum += d;
        }
        double mean = sum / deltas.size();
        double std = 0.0;
        def min = Long.MAX_VALUE;
        def max = 0L;
        for (d in deltas) {
          std += Math.pow(d - mean, 2.0);
          if (d < min) {
            min = d;
          }
          if (d > max) {
            max = d;
          }
        }
        std = std / deltas.size();
        def variance = std;
        std = Math.sqrt(std);
        return ['mean': mean, 
        'std': std, 
        'variance': variance, 
        'min': min,
        'max': max];
        """
      }
      }

This could be nested within a terms agg over the car type.

Depending on your ES version and field names, the specific time handling line

        "map_script" : "state.timestamps.add(doc['@timestamp'].value.toInstant().toEpochMilli())",

May have to change.

All 11 comments

Pinging @elastic/es-analytics-geo

Could you elaborate a bit more what kind of functionality you are wanting? We've briefly discussed streaming aggregations in the past, but the distributed nature of Elasticsearch makes these kinds of operations difficult to do.

E.g. Ingest pipeline processors are theoretically capable of calculating these streaming stats, but two entities which should be accumulated together for statistical purposes might be sent to different nodes. There is no guarantee that they will land on the same ingest node, and so the architecture would need some way to reconcile multiple streaming quantities at a later point (merging sharded, partial answers). And you have to deal with the long-lived nature of accumulation windows and persisting temporary state. If the accumulation window is one day, you would want to persist that state somewhere so losing an ingest node doesn't ruin the statistic.

It's all doable but starts to become very complicated quickly :)

A notable mention: ML's dataframe project fills this role as a non-streaming, post-processing step.

The goal of this functionality will be to compute a difference between each element in the proper order related to the time.
E.g.: my elements are sorted in ascending time: "A B C D E" the compute i would like to do is A-B=X1, B-C=X2, C-D=X3, D-E=X4 ... then compute the average (or stdev) of X1,X2,X3,X4

I understand the complexity you are describing. Therefore, the only alternative is to use ML's dataframe project? :)

Thanks

Data Frame Transforms (soon to be simply just called "Transforms") really is the right place and way to do this. As a simple example:

PUT events/
{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date" ,
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}
PUT events/_doc/1
{
  "event-type": "login",
  "@timestamp": "2019-04-23 09:15:00",
  "session": "abc-1",
  "user": "alice"
}
PUT events/_doc/2
{
  "event-type": "login",
  "@timestamp": "2019-04-23 09:38:05",
  "session": "abrd-1",
  "user": "bob"
}
PUT events/_doc/3
{
  "event-type": "logout",
  "@timestamp": "2019-04-23 12:06:00",
  "session": "abc-1",
  "user": "alice"
}
PUT events/_doc/4
{
  "event-type": "login",
  "@timestamp": "2019-04-23 10:21:43",
  "session": "klm-1",
  "user": "bob"
}
PUT events/_doc/5
{
  "event-type": "logout",
  "@timestamp": "2019-04-23 11:56:36",
  "session": "klm-1",
  "user": "bob"
}
PUT events/_doc/6
{
  "event-type": "login",
  "@timestamp": "2019-04-23 13:19:00",
  "session": "xyz-1",
  "user": "alice"
}
PUT events/_doc/7
{
  "event-type": "logout",
  "@timestamp": "2019-04-23 16:59:59",
  "session": "xyz-1",
  "user": "alice"
}
PUT events/_doc/8
{
  "event-type": "logout",
  "@timestamp": "2019-04-23 17:45:00",
  "session": "abrd-1",
  "user": "bob"
}
PUT events/_doc/9
{
  "event-type": "logout",
  "@timestamp": "2019-04-24 08:45:00",
  "session": "blud-1",
  "user": "bob"
}
POST _data_frame/transforms/_preview
{
  "source": {
    "index": "events"
  },
  "pivot": {
    "group_by": {
      "session": {
        "terms": {
          "field": "session.keyword"
        }
      },
      "user": {
        "terms": {
          "field": "user.keyword"
        }
      }
    },
    "aggregations": {
      "max_time": {
        "max": {
          "field": "@timestamp"
        }
      },
      "min_time": {
        "min": {
          "field": "@timestamp"
        }
      },
      "lengthofsession": {
        "scripted_metric": {
          "map_script": """if(doc['event-type.keyword'].value == 'login'){ state.login = doc['@timestamp'].value}
          if(doc['event-type.keyword'].value == 'logout'){ state.logout = doc['@timestamp'].value}
          """,
          "combine_script": """return state""",
          "reduce_script": """ 
          if (states[0].login != null  && states[0].logout != null){
          def login = states[0].login.millis;
          def logout = states[0].logout.millis;
          return logout-login}
          """
        }
      }
    }
  }
}

would yield the following transform:

  "preview" : [
    {
      "session" : "abc-1",
      "lengthofsession" : 10260000,
      "max_time" : "2019-04-23 12:06:00",
      "user" : "alice",
      "min_time" : "2019-04-23 09:15:00"
    },
    {
      "session" : "abrd-1",
      "lengthofsession" : 29215000,
      "max_time" : "2019-04-23 17:45:00",
      "user" : "bob",
      "min_time" : "2019-04-23 09:38:05"
    },
    {
      "session" : "blud-1",
      "lengthofsession" : null,
      "max_time" : "2019-04-24 08:45:00",
      "user" : "bob",
      "min_time" : "2019-04-24 08:45:00"
    },
    {
      "session" : "klm-1",
      "lengthofsession" : 5693000,
      "max_time" : "2019-04-23 11:56:36",
      "user" : "bob",
      "min_time" : "2019-04-23 10:21:43"
    },
    {
      "session" : "xyz-1",
      "lengthofsession" : 13259000,
      "max_time" : "2019-04-23 16:59:59",
      "user" : "alice",
      "min_time" : "2019-04-23 13:19:00"
    }
  ],
...

This is just using the _preview endpoint, but of course, the data frame transform could write this to a new index, and do so continuously (as of v7.3)

Thanks for your detailed answer @richcollier :)
If i understand properly, you compute the difference between 2 elements (login and logout for each session).
But i'm not sure if your example will work with a set like Car1=(31€, 15€, 16€...), Car2=(12€, 50€, 34€...), Car3=(37€, 26€, 42€...)?

Thanks

No problem!

By the way, using a scripted_metric is a short term solution. Transforms will hopefully soon provide a native "transaction" function.

I'm just not following your example very well. So, what are you subtracting in your Car data set?

In other words, given what you've written for Car1, Car2, and Car3, what is the thing that you're attempting to calculate?

Sorry my example was not clear.

Set of dates when each type of car was sold: (the sets are ordered by ascending time)

  • Car1 =(date1, date2, date3...)
  • Car2 =(date4, date5, date6...)

For each type or car:
I would like to make the following compute:
- Cars1: date1 - date2= X1
date2 - date3 = X2
...

Then:
Average(X1,X2,...) = Y
Stdev(X1,X2,...) = Z

So, as you said, these computes can be realized by using scripted_metric and transaction function ?
There are no others possibilities ?

@zaule There are a handful of possibilities:

  • scripted_metric is definitely possible, and should not be difficult to implement. But, ALL date values for each car need to be held in heap memory. This is so we can make sure they are sorted appropriately for capturing the differences. These would be stored as long values, so depending on the number of values this could get expensive. I can write up an example script for you if you would like
  • I think a combination of composite_aggs + serial_difference + average_bucket would work. But, I think a terms aggregation against the date field may have to be done to make sure things are sorted appropriately per bucket. Bucket ordering is important, of course, because of how we want to do the differences
  • There MAY be a way to do this with data_frame transforms. It would be done in combination with scripted metric (not too difficult), use less memory than the first option, but every difference combined with the car would have to be written to a doc. So, you are trading larger disk space usage for less memory. Then on the destination index, you could run your statistics against all the diffs for each car.

How many date values are we talking about total? I think that will dictate the path forward in combination with how much memory your cluster has for its heap.

Here is an example scripted metric (variable names may have to change)

      "time_delta_stats": {
        "scripted_metric": {
        "init_script" : "state.timestamps = []", 
        "map_script" : "state.timestamps.add(doc['@timestamp'].value.toInstant().toEpochMilli())",
        "combine_script" : "return state.timestamps",
        "reduce_script" : """
        def all_timestamps = [];
        for (s in states) { 
              for (t in s) { 
                all_timestamps.add(t); 
              }
            }
        all_timestamps.sort(Comparator.naturalOrder());
        def deltas = [];
        for (int i = 0; i < all_timestamps.size() - 1; i++) {
          deltas.add(all_timestamps.get(i+1) - all_timestamps.get(i));
        }
        if (deltas.size() == 0) {
          return null;
        }
        def sum = 0L;
        for(d in deltas) {
          sum += d;
        }
        double mean = sum / deltas.size();
        double std = 0.0;
        def min = Long.MAX_VALUE;
        def max = 0L;
        for (d in deltas) {
          std += Math.pow(d - mean, 2.0);
          if (d < min) {
            min = d;
          }
          if (d > max) {
            max = d;
          }
        }
        std = std / deltas.size();
        def variance = std;
        std = Math.sqrt(std);
        return ['mean': mean, 
        'std': std, 
        'variance': variance, 
        'min': min,
        'max': max];
        """
      }
      }

This could be nested within a terms agg over the car type.

Depending on your ES version and field names, the specific time handling line

        "map_script" : "state.timestamps.add(doc['@timestamp'].value.toInstant().toEpochMilli())",

May have to change.

Thank you very much for your help.
I will try this

No further feedback received. @zaule Please reopen this ticket with more info if this is still a problem.

Was this page helpful?
0 / 5 - 0 ratings