Hi there!
I've been trying to use CDC in Scylla as an alternative to the Dynamo Streams. There's one valuable feature missing which is the expiration event.
For example, when I insert a record in scylla with ttl=10s, then there's a new entry in the cdc log table saying that the ttl=10s.
However, there is no new entry after 10s saying that the record is now expired.
Dynamo notifies about the expiration events and marks them as dynamo-issued events.
Is it possible to have the same feature in Scylla CDC?
Hi @melgenek,
This is an interesting idea but it will be impossible to report expiring event at the right time. The way Scylla works, it expires records based on TTL on the fly while reading or compacting. That means the operation is lazy and there's no action performed at the exact time of expiration. Would an entry added to the CDC log much later than the expiration time be of any use do you, @melgenek ?
We could add the expiration event (with the expiration time serving as the CDC record time key) when the data is inserted. If the data is later modified, we'd need to delete the expiration event.
We'd need to prevent these future records from being seen, perhaps by force-adding a WHERE clause to select statements that read from CDC to hide these future events.
It may wreak havoc with TWCS though.
@melgenek this is a correct observation. In issue #5060 we describe the issue of being 100% compatible with DynamoDB Streams (for Scylla's DynamoDB API compatibility, a.k.a. Alternator), and indeed the issue that DynamoDB Streams report item expiration and our CDC does not prevents us from just using Scylla's TTL to represent DynamoDB's TTL.
As @haaawk noted, Scylla "discovers" that data has expired while "compacting" data - which can happen when reading the data or during automatic background compaction. Beyond the fact that this happens too late (as @haaawk mentioned), another problem is that each replica may notice this expiration event separately, and our CDC replication model cannot allow different replicas to write the same event to the log at very different (more than a slack defaulting to 10 seconds) times.
@avikivity made a very interesting suggestion above - of inserting future-dated expiration events into the CDC log when the expiration time is set. This will complicate a lot of things, but sounds like a plausible solution. With this solution is place, the CDC log will normally with a long list of future-dated expiration events, but that's not really a problem because the part of the log that we do need to read is still contiguous so it's still efficient to read.
Another option to workaround this missing feature _today_ is to do what people have done before DynamoDB added their TTL feature - run a background process which scans the data periodically (DynamoDB documentation suggests expiration may be delayed by 48 hours) and when it sees an expired item, it is deleted, with a normal delete, which will appear in the CDC log.
We could add the expiration event (with the expiration time serving as the CDC record time key) when the data is inserted.
We cannot write arbitrarily far into the future because we don't know what CDC generation is going to be operating at that time.
We could add the expiration event (with the expiration time serving as the CDC record time key) when the data is inserted.
We cannot write arbitrarily far into the future because we don't know what CDC generation is going to be operating at that time.
Good point.
The reason we have those generation thing in the first place is to allow synchronous writes to the CDC log - which will always be in the same node (and even shard) as the base table. Maybe we should additionally have an asynchronous write - to a different partition (which doesn't change with generations) for those future-dated deletions. It won't be trivial though (and we need to be careful not to reproduce the problems we have in materialized views with asynchronous updates), and it will also have a performance impact.
How would we know which node will be replica of this future-dated write @nyh?
Just to clarify my use case: the cdc events would be used to update some counters. So why I need to have the expiration events is to decrement the counters.
Would an entry added to the CDC log much later than the expiration time be of any use do you, @melgenek ?
@haaawk based on my use case, yes, it's good enough to have the event eventually.
The reason we have those generation thing in the first place is to allow synchronous writes to the CDC log - which will always be in the same node (and even shard) as the base table. Maybe we should additionally have an asynchronous write - to a different partition (which doesn't change with generations) for those future-dated deletions.
One problem with writing these future-dated deletions to a different partition is that you lose the sequential view of changes: now you have the client would have to inspect both the stream IDs and those different partitions and merge the ordered lists to recover the sequential view. Also, you say that these different partitions don't change with generations. Ok, but how do we make them scale? How do we ensure that these partitions are uniformly distributed among the cluster? This is the problem that stream ID sets that change with topology changes solve. The number of stream IDs is large enough to achieve uniform distribution (one per vnode per shard), but not larger than necessary so the client can query them all.
How would we know which node will be replica of this future-dated write @nyh?
The future-dated deletions can be on a separate partition, which does not "belong" to any particular node (which, as I said, will makek this write cross nodes and be slower and probably asynchronous). Readers will need to read that partition too. There can be more than one of those partitions to allow high throughput. There is a problem that we have entries for the same row (creation and deletion) in two different CDC log partitions, but if we assume that expiration is a long time in the future this shouldn't be a problem because any reasonable client would see the creation first and just then the deletion.
Yes, I realize how ugly this sounds... For Alternator, I was really thinking of using a background process, not tricks of this type.
How would we know which node will be replica of this future-dated write @nyh?
The future-dated deletions can be on a separate partition, which does not "belong" to any particular node (which, as I said, will makek this write cross nodes and be slower and probably asynchronous). Readers will need to read that partition too. There can be more than one of those partitions to allow high throughput. There is a problem that we have entries for the same row (creation and deletion) in two different CDC log partitions, but if we assume that expiration is a long time in the future this shouldn't be a problem because any reasonable client would see the creation first and just then the deletion.
Yes, I realize how ugly this sounds... For Alternator, I was really thinking of using a background process, not tricks of this type.
Then let's make it a separate table and that way we wouldn't need to create special infrastructure ;).
You would still need to create/instruct user on how to retrieve it. Can't we store the future-deletes in a separate table, note the time for expected delivery, and have a per-shard reaper task that gets updated to wake up on first-timeout, query+delete expiry rows in this table within the current window and then write them to CDC proper?
On each CDC write to a node, if a TTL is set, we update the reaper timer to maybe wake up sooner, and rinse repeat.
Another possible implementation:
When compaction on one replica expires a TTLed cell, it can write to other replicas a command (similar to a coordinator write) which will cause it to be immediately tombstoned there - and also the appropriate writes to their CDC logs. A difficult question is how to do this reliably: If we want QUORUM reads from the CDC log to work, we need the compaction to insist that QUORUM replicas receive the delete (including this replica) - but it's not clear what we can do if we can't achieve this QUORUM.
UPDATE: There's a problem that might require us to delay expiration and do frequent repairs: The problem is that if a user rewrites existing data with new - later - expiration time, it is possible for one of the replicas to miss the update and still think the old (earlier) expiration time is in place. It will then expire the item too soon, and write the wrong expiration event to its own CDC log - and even if the other nodes will not agree repairing the CDC log will eventually spread the wrong event. This might mean that when we decide to expire an item we need to do a QUORUM read - not just a QUORUM write - to make sure the expiration is valid. Another option is to always delay expiration by at least GC grace seconds (the repair frequency) after the original expiration time. This is the same thing we do today to tombstones.
may I ask if this is planned and/or will be added with a particular release?
We're thinking about some potential solutions but it's not on the roadmap yet.
Most helpful comment
We cannot write arbitrarily far into the future because we don't know what CDC generation is going to be operating at that time.