Ksql: Documentation on 'convert a changelog to a table' should clarify how tombstones are handled

Created on 18 Jun 2020  路  14Comments  路  Source: confluentinc/ksql

The advice here: https://github.com/confluentinc/ksql/blob/master/docs/how-to-guides/convert-changelog-to-table.md, on how to convert a changelog into a table is using a flawed pattern.

It suggests using the following:

CREATE TABLE t1 AS
    SELECT k,
           LATEST_BY_OFFSET(v1) AS v1,
           LATEST_BY_OFFSET(v2) AS v2,
           LATEST_BY_OFFSET(v3) AS v3
    FROM s1
    GROUP BY k
    EMIT CHANGES;

This is not correctly converting a changelog stream into a table as it does not honour tombstones. It would create an table t1 that would _never_ do any DELETEs, only UPSERTs.

We should not be suggesting this as a good pattern!

This can be tested using the following QTT test:

{
      "name": "materialize changelog",
      "statements": [
        "CREATE STREAM INPUT (ID BIGINT KEY, F0 INT) WITH (kafka_topic='test_topic', value_format='JSON');",
        "CREATE TABLE OUTPUT AS SELECT ID, LATEST_BY_OFFSET(F0) AS F0 FROM INPUT GROUP BY ID;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": {"F0": 1}},
        {"topic": "test_topic", "key": 0, "value": {"F0": 2}},
        {"topic": "test_topic", "key": 0, "value": null},
        {"topic": "test_topic", "key": 1, "value": {"F0": 10}},
        {"topic": "test_topic", "key": 0, "value": {"F0": 3}}
      ],
      "outputs": [
        {"topic": "OUTPUT", "key": 0, "value": {"F0": 1}},
        {"topic": "OUTPUT", "key": 0, "value": {"F0": 2}},
        {"topic": "OUTPUT", "key": 0, "value": null},
        {"topic": "OUTPUT", "key": 1, "value": {"F0": 10}},
        {"topic": "OUTPUT", "key": 0, "value": {"F0": 3}}
      ]
    }

The above test fails because the OUTPUT topic doesn't contain the third expected output row: the tombstone corresponding to the tombstone in the input topic.

A changelog topic should can be converted to a table/mv using:

CREATE TABLE t1 (
    ID BIGINT PRIMARY KEY,
    v1 INT,
    v2 STRING,
    v3 BOOLEAN
  ) WITH (
    kafka_topic='t1-changelog',
    value_format='Json'
  );

Tombstones are correctly handled with the above statement. Of course, the result is not materialized into rocks DB, so is not queryable through a pull query.

Unfortunately, I don't believe there is any way with ksqlDB, as it stands, to force it to materialize the table so that you can issue pull queries against it, while maintaining correct tombstone handling.

So... we either fix the doc, or make this possible. We should definitely remove this as a recommendation in the short term.

Fixing this probably isn't trivial. I'm not sure importing a changelog as a stream and then aggregating it into a table makes sense. @mjsax probably has a view on this too.

One potential fix would be to introduce syntax to allow the user to say they want the table materialized, e.g.

CREATE TABLE t1 (
    ID BIGINT PRIMARY KEY,
    v1 INT,
    v2 STRING,
    v3 BOOLEAN
  ) WITH (
    kafka_topic='t1-changelog',
    value_format='Json',
    materialize=true
  );

Or to have ksqlDB auto-magically materialize it on the first pull query...

documentation

All 14 comments

@big-andy-coates We talked this over at length both when we implemented LATEST_BY_OFFSET and when we wrote the guide. We wrote this piece of documentation because people constantly ask how to do exactly this. I'll update the docs to clarify that this doesn't handle tombstones, but that's a known limitation and not a bad pattern to use.

I'm struggling to see how this is not a bad pattern.

  1. The result is wrong: it will end up containing rows it shouldn't. You can't say you're this is converting a 'changelog' into a table if its ignoring tombstones. The only meaningful difference between a changelog and any other topic is that nulls are interpreted as tombstones.

  2. Anyone making use of such a pattern will likely see their rocksdb bloat over time and ksql nodes run out of disk space and crash eventually.

How can we be promoting this as a good pattern to use?

Yes, lots of people ask how to do this. But surely what we should be doing is providing a way for them to do this that works, not documenting and promoting something we _know_ doesn't...

I've found that often people asking things like 'how to I convert a stream of IOT sensor readings into a table' have not tried just using a CREATE TABLE statement! Yet the page doesn't suggest this as the first, easiest and most correct, solution.

At the very least we should be suggesting people use a simple CREATE TABLE statement and then only suggesting they use this hack/trick if they don't have any tombstones, i.e. their topic contains a stream of keyed values, not a changelog with tombstones. The title probably also needs updating. We should definitely call out the pitfalls of this approach is your topic is actually a changelog. We should only use the term 'changelog' when referring to a topic containing an actual changelog, not just one with keyed values.

I'm struggling to see how this is not a bad pattern.

Let's connect offline. We discussed this quite a lot in PRs, design docs, and live discussions and came to the conclusion that it was okay to skip tombstones for now.

I've found that often people asking things like 'how to I convert a stream of IOT sensor readings into a table' have not tried just using a CREATE TABLE statement! Yet the page doesn't suggest this as the first, easiest and most correct, solution.

People balk at this because it isn't usable with pull queries. It's also not a great solution if you're building it off an existing stream.

We did discuss the topic in length when latest_by_offset was added, and I did raise concerns, but the majority of people thought, that the limitation to not be able to handle tombstone is acceptable. And maybe that is ok.

_However_, the documentation of the feature seem to be quite miss leading, and it starts with the title. The term "changelog" is not appropriate to describe how it works. In the end, the result is semantically an "aggregation over a _fact/record_ stream". The only way how one can read a changelog stream as table, is by writing the stream into a topic and reading it back via CREATE TABLE. The documentation should contrast both features and point out the differences.

In general, I think we should allow people to upsert a (changelog) stream into a table directly in ksqlDB -- e.g., as proposed in KLIP-11 via table(<stream>) operator (including proper tombstone handling); of course, the table operator is just one idea and there might be other was to achieve the same thing. However, tombstones in _streams_ still raise a couple of open question and I am not 100% sure how we need to handle them end-to-end. -- Note, that in Kafka Streams we recently added KStream#toTable() operator for the exact some purpose. And we added toTable() because aggregating a record stream is just not the same operation -- it's semantically two different things and we should not conflate them with each other.

The documentation should contrast both features and point out the differences.

Yes, I think this is key.

However, tombstones in streams still raise a couple of open question and I am not 100% sure how we need to handle them end-to-end

Me neither. But we can discuss this offline.

@big-andy-coates I think a WITH(materialize=true, ...) option would be very useful and also a good solution to this problem. I've filed #5652 to capture this work.

While this might be related, the issue we discuss here is, how tombstones are handled. And WITH(materialize=true) would not solve it, but addresses a different issue.

While this might be related, the issue we discuss here is, how tombstones are handled. And WITH(materialize=true) would not solve it, but addresses a different issue.

@mjsax to clarify, what is meant here is that materialize=true would allow users to materialize a table in a way that respects tombstones (as they currently work).

I don't think that would be a good solution. The aggregation operator itself drops input tombstone and thus those don't make it into the result table. And using a "flag" on the output table to change the semantics of an operator of the query seems to be a bad idea. Furthermore, the idea to change the semantics of the aggregation operator itself seems to be questionable to begin with.

The idea wouldn鈥檛 be to use WITH(materialize=true) on an aggregate table in order to hack tombstones in. I agree that that wouldn鈥檛 be a good idea.

The idea is to use materialize=true on plain tables so that the latest_by_offset method for materialization isn鈥檛 even necessary. e.g.,

CREATE TABLE t (x INT) WITH (materialize=true, ...)

And then users could issue pull queries against that table since it would be materialized. This new option would have no interaction with aggregation semantics.

Sure, that would work. But this is exactly what my original comment was about: It addresses an orthogonal issue. Your suggestion is not about tombstone handling, but about materialization.

And this ticket is about the lack of documentation of tombstone semantics (and that they are not handled via latet_by_offset). And as mentioned above, how tombstone should be handled end-to-end is an open question.

I guess, we are actually on the same page and just did not fully understand what the other person was saying.

Btw: I am not sure if using a flag like you propose is the best solution to the materialization problem (maybe it good enough as a quick intermediate solution though). Need to think about it in more details, but atm, I tend to think we should switch from CT to CREATE MV syntax and for this case _always_ materialize into RocksDB (and thus, we won't need this flag).

+1 for switching for MVs and always materializing... oh wait, I wrote the KLIP for that :p

The WITH(materialized=true) could be a short term fix. Though thinking about it, its probably not much more _coding_ to switch the syntax. Its just a lot more documentation updates!

@mjsax @MichaelDrogalis Do we know if there's any documentation work on this ticket? If not, I'll go ahead and close it.

Let's leave this in place for now and update it when @mjsax rolls out his plan for toTable() in the future.

Was this page helpful?
0 / 5 - 0 ratings