Say your data shippers sometimes send duplicates. With time-based indices it used to be possible to remove dups by using Logstash's fingerprint filter to create the document _id based on a hash of its content and making sure to index into the index that matched the value of the @timestamp field to make sure that two documents with the same date would go to the same index, even if one is sent before midnight UTC, and the other one is sent after midnight - when a new daily index is created.
Unfortunately with rollover, one can't control anymore the index to which a document is written. This has benefits, such as ensuring that previous indices won't be written to anymore, which might not be true with the approach outlined in the previous paragraph. However the downside is that users can no longer handle duplicates in their indices as sending the same document before and after a new index is created would result in duplicates across the index pattern. The main practical implication is that these users can't use ILM.
Is there something we can do to enable these users to use ILM?
Pinging @elastic/es-core-features
This is a frequent problem when using systems that provide at-least-once delivery semantics like Kafka. Rollover brings significant benefits compared to regular time-based indices, so we would like to make it work for this use-case, but it is challenging. One idea could be to look up whether the document already exists in the index from the previous time-frame, but this could slow down indexing significantly given that id lookups are a significant part of the indexing process. The idea was raised whether the deduplication logic could be moved to Logstash, but this is likely not applicable as it relies on the assumption that all documents go through a single instance.
Maybe we could implement this idea of deleting in the previous index and creating in the current index actually. The trick would be to compute ids in such a way that ids from the current time frame are unlikely to share a prefix with ids from the previous time frame, so that deletions in the previous time frame run without going to disk.
For instance if we prefixed the hash by the number of minutes since Epoch, then about one minute after we move to a new time frame, deletions in the previous time frame would likely not need to go to disk anymore?
I talked with a user who would like this feature this morning. Some data points:
Some use-cases that suffer from this issue are also the ones where indexing speed is key. That means that we often optimize and accept tradeoffs for fast indexing (longer refresh intervals, async translog durability, etc.). That makes me think that anything we introduce that impacts indexing speed can be tricky to adopt in that case.
Maybe we can think of a "Lazy deduplication" approach, where we tolerate the duplicates for short life-spans but we have another independent process responsible for deleting them in past (rolled over) indices based on document ids. To help reduce the management overhead, this can be liked with the rollover task itself and triggered with a successful rollover action. eg:
POST /logs_write/_rollover
{
"conditions": {
"max_age": "7d",
"max_docs": 1000,
"max_size": "5gb"
},
"settings": {
"lazy_deduplication": true
}
}
Maybe it can be a good enough solution for some use-cases?
@Ryado I think that the fact that duplicates might be spread across multiple indices makes it a bit tricky to implement efficiently?
maybe make sense to throw duplicates out during aggregation as it implemented in Cassandra
Something like that:
d = {
"my_index-000001": [
{"@t": 0, "msg": "a message", "fingerprint": "a"},
{"@t": 0, "msg": "b message", "fingerprint": "b"},
{"@t": 0, "msg": "c message", "fingerprint": "c"},
],
"my_index-000002": [{"@t": 0, "msg": "d message", "fingerprint": "d"}],
"my_index-000003": [
{"@t": 0, "msg": "e message", "fingerprint": "e"},
{"@t": 0, "msg": "a message", "fingerprint": "a"},
]
}
In : list({doc["fingerprint"]: doc for k,v in d.items() for doc in v}.values())
Out :
[{'@t': 0, 'msg': 'a message', 'fingerprint': 'a'},
{'@t': 0, 'msg': 'b message', 'fingerprint': 'b'},
{'@t': 0, 'msg': 'c message', 'fingerprint': 'c'},
{'@t': 0, 'msg': 'd message', 'fingerprint': 'd'},
{'@t': 0, 'msg': 'e message', 'fingerprint': 'e'}]
Most helpful comment
Maybe we could implement this idea of deleting in the previous index and creating in the current index actually. The trick would be to compute ids in such a way that ids from the current time frame are unlikely to share a prefix with ids from the previous time frame, so that deletions in the previous time frame run without going to disk.
For instance if we prefixed the hash by the number of minutes since Epoch, then about one minute after we move to a new time frame, deletions in the previous time frame would likely not need to go to disk anymore?