Elasticsearch: [ingest] Enrich documents prior to indexing

Created on 10 Aug 2018  路  6Comments  路  Source: elastic/elasticsearch

Enrichment at ingest

This issue describes a project that will leverage the ingest node to allow for enrichment of documents before they are indexed.

Below is a diagram that highlights the workflow. The red parts are new components.

image

  • .enrich-* - index(es) managed managed by Elasticsearch that contains a highly optimized subset of the source data used for enrichment.
  • source index - a normal index managed externally (e.g. not by Elasticsearch) that contains the data used for enrichment
  • enrich policy - a policy that describes how to synchronize the source index with the .enrich-* index. The policy will describe which fields to copy and how often to copy the fields.
  • decorate processor - an ingest node processor that reads from a .enrich-* index to mutate the raw data before it is indexed. The .enrich-* will be data local to the decorate processor.

There are many moving parts so this issue will serve as a central place to track them.

Tasks

Enrich policy definition

  • [x] Define the enrich policy (@martijnvg) https://github.com/elastic/elasticsearch/pull/41003
  • [x] Rename enrich_key to match_field and enrich_values to enrich_fields.
  • [x] Remove type field and make the type a top level json object that contains all the configuration of an enrich policy. #45789
{
  "exact_match": {
    "match_field": "prsnl.id",
    "enrich_fields": [
      "prsnl.name.first",
      "prsnl.name.last"
    ],
    "indices": [
      "bar*",
      "foo"
    ],
    "query": {}
  }
}

instead of:

{
    "type": "exact_match",
    "indices": [
     "bar*",
     "foo"
    ],
    "match_field": "prsnl.id",
    "enrich_fields": [
     "prsnl.name.first",
     "prsnl.name.last"
    ],
    "query": {
    }
}

Enrich processor

  • [x] Write rally track for exact match processor.
  • [x] Add an enrich processor that uses the search api via node client in order to do the enrichment.
  • [x] Optimize they way msearch is executed for enrich processor lookups. Enrich indices always have a single shard, which allows us the easily optimize the execution of multiple search requests bundled together in a bulk. #43965
  • [x] Ensure that EnrichProcessorFactory always has access to the latest enrich policies.
    (Currently if multiple CS updates are combined then enrich policy changes may not be visible)
  • [x] Allows IngestService to register components that are updated before the processor factories.
  • [x] Register EnrichProcessorFactory as component that keeps track of the policies.
  • [x] Rename the enrich_key option to field in enrich processor configuration. #45466
  • [x] Remove set_from and targets options and introduce target_field option that is inline with what geoip processor is doing. The entire looked up document is placed as json object under the target_field. #45466
  • [x] Change the enrich processor to not depend on the actual EnrichPolicy instance. Just on the policy name. From the policy name, the enrich index alias can be resolved and from the the currently active enrich index. The enrich index should have the match_field of policy in the meta mapping stored, this is the only piece of information required to do the enrichment at ingest time. #45826
  • [x] Add overwrite parameter to enrich processor. #45029
  • [ ] Add template support to field and target_field parameters.
  • [ ] Include match count into document being enriched to see whether there were no matches or multiple matches.
  • [ ] Add a LRU cache that is only used when enrich processor needs to make a remote call to do the lookup.
  • [x] Add support for match policy type.
  • [x] Add support for geo_share_match_policy type. #42639
  • [ ] Add support for ip_range_match policy type.
  • [ ] Explore warming the LRU cache based on entries from the previous enrich index.

Policy management

  • [x] Think about bwc around enrich policy types.
    (add created version to EnrichPolicy?) (@jbaiera) #45021
  • [x] Execute force merge when running policy. (@jbaiera) #41969
  • [x] Introduce background process that removes enrich indices that are not referenced by an alias or no policy exists for an enrich index. ~The background process should mark indices for deletion first, and remove them in the next execution (To avoid deleting indices that have been freshly retired from the enrich alias and still potentially in use)~. Also the background process should not delete any indices that are tied to policies currently being executed - We don't want to throw out new indices that are currently being populated by a policy execution. (@jbaiera) #43746
  • [x] Add validation that enrich key fields / enrich values
    field are not inside an array of objects (nested). (@jbaiera) #42452
  • [x] De-normalize nested data inside source index when executing policy.
  • [x] Stats (in memory)
  • [x] Error Handling
  • [x] Add description to .enrich index as _meta mapping to indicate that this index is managed by ES and shouldn't be modified in any way. (@jbaiera)
  • [x] Always drop the _id and _routing field from documents originating from source indices. This to ensure the uniqueness of documents. (@jbaiera)
  • [x] Overwrite specific index settings on enrich index: disable field data, global ordinals loading, shard allocation filtering, automatic refresh.
  • [x] Should force merge as part of policy execution results in more than one segment retry the force merge or fail the execute policy request?

APIs

  • [x] Get policy API
  • [x] Execute policy API.
  • [x] Add manage_enrich privilege.
  • [x] Make policies immutable. The PUT policy api should fail when a policy already exists, so effectively this api can only return a 200 response code. If a policy needs to be changed then it first needs to be removed, or alternatively, a new policy under a different name should be added. (@hub-cap) #43604
  • [x] A policy should not be removed if a pipeline is still referencing it. (@hub-cap) #44438
  • [x] The delete policy api should first remove all enrich indices of a policy, before removing the policy from the cluster state. (@hub-cap) #45870
  • [x] Use has_privilege api as part of put policy api to check whether the user has sufficient privileges in source index. (@hub-cap) #43595
  • [x] Policy name validation. The validation should be similar to index name validation, because the policy name is used to created an index. (same validation as in MetaDataCreateIndexService#validateIndexOrAliasName) (@martijnvg)
  • [x] Replace current get and list APIs with another API that returns both a single policy and all policies. In both cases a list should be returned. For example
    GET _enrich/policy/users-policy (specific policy) and GET _enrich/policy (all policies). Both variants should always return a list of objects. And later also support:
    GET _enrich/policy/users-* and GET _enrich/policy/users-policy,users2-policy. (@hub-cap) #45705
  • [x] CRUD for enrich policy (@hub-cap) _enrich/policy/name
  • [x] Store enrich policy in an index (.enrich-policies ?) instead of in the cluster state. (@hub-cap) #47475
  • [x] Stats API
  • [ ] Integrate stats api with monitoring
  • [ ] Telemetry support
  • [ ] task api for execute ?wait_for_completion=false (@hub-cap)
  • [x] GET wildcard and comma separated policy names (@hub-cap)

Misc

  • [x] Restart qa test
  • [x] Documentation
  • [x] Enable / Disable settings
  • [x] HLRC
  • [x] update Kibana roles for new role, to be done after the feature branch is merged to master obsoleted by https://github.com/elastic/kibana/pull/40270
  • [ ] update stack docs for the new role, to be done after the feature branch is merged to master
  • [x] Transport client support. (@hub-cap) #46002
  • [ ] Integration with xpack usage api.

EDITS:

  • 2019-4-8: Changed the original description of this issue to reflect the current direction*
  • 20190507: Updated after planning meeting.
:CorFeatureIngest >enhancement Meta

Most helpful comment

@jakelandis When I closed #20340 I started to work on a JDBC ingest plugin which was basically doing lookups to a 3rd party database. The way I designed it was by heavily using cache to make lookups running as fast as possible with local data.

2 strategies at this period:

  • cache hit by hit. The more you call ingest-jdbc the more you are caching data, the faster it runs
  • cache on ingest startup. It starts an embedded in memory database, create a schema identical to the source one, import the table data in memory.

Of course with cache eviction, memory usage protection (ie. don't load more than x kb/mb of data...).

Is that one of the thing you have in mind?

All 6 comments

Closing as better alternatives for these use cases have been discussed.

Re-opening per further discussion.

Pinging @elastic/es-core-features

@jakelandis When I closed #20340 I started to work on a JDBC ingest plugin which was basically doing lookups to a 3rd party database. The way I designed it was by heavily using cache to make lookups running as fast as possible with local data.

2 strategies at this period:

  • cache hit by hit. The more you call ingest-jdbc the more you are caching data, the faster it runs
  • cache on ingest startup. It starts an embedded in memory database, create a schema identical to the source one, import the table data in memory.

Of course with cache eviction, memory usage protection (ie. don't load more than x kb/mb of data...).

Is that one of the thing you have in mind?

This would be beneficial to do real-time lookups within Elasticsearch.

Hey all,

Here's the summary of decisions and action items from the policy index cleanup meeting this Friday:

  • Decision: Old indices should be removed in a background process

    • Reasoning: A PolicyRunner could fail for any reason at any time (master changes, OOM, Cosmic Rays, etc.) and is linked to user action (The execute api). The process of cleaning up old indices should be always running. If a background process fails to clean up indices, it can try again in X amount of time without needing a user to be present.

    • Action Items:

    • A background process should be added to delete unused enrich indices

    • The indices that it will target are:



      • Not referenced by an enrich alias


      • Not linked to an existing policy (Unlikely, but we should handle this for sanity purposes)



    • ~The background process should mark indices for deletion first, and remove them in the next execution (To avoid deleting indices that have been freshly retired from the enrich alias and still potentially in use)~ Mark then delete made more sense originally, but instead we can check if any policies are currently executing and skip the cleanup process until no policies are in flight.

    • The background process should not delete any indices that are tied to policies currently being executed - We don't want to throw out new indices that are currently being populated by a policy execution.



      • We should keep some side data with the policy locks that can be used to detect if a policy has executed between requesting index information and marking them for cleanup. This will fix a potential race condition where the indices that are being cleaned up could contain stale information and could lead the maintenance process to delete live enrich indices



  • Decision: Policies should be immutable

    • Reasoning: If a policy definition is updated such that it is incompatible with a running processor (type of enrichment changes), then the next time it is executed the processor will fail and cause a poor user experience. The correct route for updating a policy would be to create a new policy and pipeline, execute the new policy, switch ingestion over to the new pipeline and remove the old pipeline and policy. An update API would be heavily discouraged, so we should just not have it.

    • Action Items:

    • The put policy action should fail if a policy already exists with the same ID

  • Decision: When deleting a policy, we should execute the delete process for all the policy's aliases before removing the policy

    • Reasoning: If a policy is deleted and a new completely different definition is added with the same policy id, we do not want processors to break on trying to read the data from the previous policy that hasn't been cleaned up yet

    • Action Items:

    • The delete api will hook into a portion of the same delete logic from the background process to delete indices for the policy that is being removed

    • The policy will only be removed from the cluster state once all of its indices have been confirmed to be removed.

  • Decision: Policies should not be deleted if a processor is currently referencing them

    • Reasoning: If a user deletes a policy that is being used, then the ingest processor will fail when the indices are deleted. Deleting a policy that is in use should be considered user error and guarded against.

    • Action Items:

    • When deleting a policy, the list of processors in the cluster state should be checked for references to the policy being deleted. The delete operation should fail if any pipelines are using the policy.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

rjernst picture rjernst  路  3Comments

Praveen82 picture Praveen82  路  3Comments

abrahamduran picture abrahamduran  路  3Comments

martijnvg picture martijnvg  路  3Comments

dawi picture dawi  路  3Comments