Elasticsearch: Duplicate handling with rollover

Created on 24 Jul 2019  路  7Comments  路  Source: elastic/elasticsearch

Say your data shippers sometimes send duplicates. With time-based indices it used to be possible to remove dups by using Logstash's fingerprint filter to create the document _id based on a hash of its content and making sure to index into the index that matched the value of the @timestamp field to make sure that two documents with the same date would go to the same index, even if one is sent before midnight UTC, and the other one is sent after midnight - when a new daily index is created.

Unfortunately with rollover, one can't control anymore the index to which a document is written. This has benefits, such as ensuring that previous indices won't be written to anymore, which might not be true with the approach outlined in the previous paragraph. However the downside is that users can no longer handle duplicates in their indices as sending the same document before and after a new index is created would result in duplicates across the index pattern. The main practical implication is that these users can't use ILM.

Is there something we can do to enable these users to use ILM?

:CorFeatureILM+SLM >feature CorFeatures high hanging fruit

Most helpful comment

Maybe we could implement this idea of deleting in the previous index and creating in the current index actually. The trick would be to compute ids in such a way that ids from the current time frame are unlikely to share a prefix with ids from the previous time frame, so that deletions in the previous time frame run without going to disk.

For instance if we prefixed the hash by the number of minutes since Epoch, then about one minute after we move to a new time frame, deletions in the previous time frame would likely not need to go to disk anymore?

All 7 comments

Pinging @elastic/es-core-features

This is a frequent problem when using systems that provide at-least-once delivery semantics like Kafka. Rollover brings significant benefits compared to regular time-based indices, so we would like to make it work for this use-case, but it is challenging. One idea could be to look up whether the document already exists in the index from the previous time-frame, but this could slow down indexing significantly given that id lookups are a significant part of the indexing process. The idea was raised whether the deduplication logic could be moved to Logstash, but this is likely not applicable as it relies on the assumption that all documents go through a single instance.

Maybe we could implement this idea of deleting in the previous index and creating in the current index actually. The trick would be to compute ids in such a way that ids from the current time frame are unlikely to share a prefix with ids from the previous time frame, so that deletions in the previous time frame run without going to disk.

For instance if we prefixed the hash by the number of minutes since Epoch, then about one minute after we move to a new time frame, deletions in the previous time frame would likely not need to go to disk anymore?

I talked with a user who would like this feature this morning. Some data points:

  • they need this feature because they use Kafka in a way that only guarantees at-least-once delivery, so there might be duplicates - I believe this is a common situation
  • ids are computed by a custom data shipper, but it works fairly similarly to Logstash's fingerprint filter
  • at times they also used this to replay indexing of a subset of the logs
  • they plan to allow up to 30 days between the timestamp of the event and when it gets indexed (which means we might have to delete in lots of indices if applying the above idea)

Some use-cases that suffer from this issue are also the ones where indexing speed is key. That means that we often optimize and accept tradeoffs for fast indexing (longer refresh intervals, async translog durability, etc.). That makes me think that anything we introduce that impacts indexing speed can be tricky to adopt in that case.

Maybe we can think of a "Lazy deduplication" approach, where we tolerate the duplicates for short life-spans but we have another independent process responsible for deleting them in past (rolled over) indices based on document ids. To help reduce the management overhead, this can be liked with the rollover task itself and triggered with a successful rollover action. eg:

POST /logs_write/_rollover 
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  1000,
    "max_size":  "5gb"
  },
  "settings": {
    "lazy_deduplication":   true
  }
}

Maybe it can be a good enough solution for some use-cases?

@Ryado I think that the fact that duplicates might be spread across multiple indices makes it a bit tricky to implement efficiently?

maybe make sense to throw duplicates out during aggregation as it implemented in Cassandra

Something like that:

d = {
"my_index-000001": [
  {"@t": 0, "msg": "a message", "fingerprint": "a"},
  {"@t": 0, "msg": "b message", "fingerprint": "b"},
  {"@t": 0, "msg": "c message", "fingerprint": "c"},
],

"my_index-000002": [{"@t": 0, "msg": "d message", "fingerprint": "d"}],

"my_index-000003": [

  {"@t": 0, "msg": "e message", "fingerprint": "e"},
  {"@t": 0, "msg": "a message", "fingerprint": "a"},
]
}

In : list({doc["fingerprint"]: doc for k,v in d.items() for doc in v}.values())
Out : 
[{'@t': 0, 'msg': 'a message', 'fingerprint': 'a'},
 {'@t': 0, 'msg': 'b message', 'fingerprint': 'b'},
 {'@t': 0, 'msg': 'c message', 'fingerprint': 'c'},
 {'@t': 0, 'msg': 'd message', 'fingerprint': 'd'},
 {'@t': 0, 'msg': 'e message', 'fingerprint': 'e'}]

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dadoonet picture dadoonet  路  3Comments

abtpst picture abtpst  路  3Comments

DhairyashilBhosale picture DhairyashilBhosale  路  3Comments

jasontedor picture jasontedor  路  3Comments

matthughes picture matthughes  路  3Comments