Ksql: Tombstones not visible in SELECT and PRINT statements

Created on 18 Oct 2019  路  15Comments  路  Source: confluentinc/ksql

This test passes (using 5.3.0 and 5.4.0-beta1):

{
  "name": "CTAS filtered by WHERE condition",
  "statements": [
    "CREATE TABLE balances (balance INT) WITH (KAFKA_TOPIC='balances', PARTITIONS=1, VALUE_FORMAT='DELIMITED');",
    "CREATE TABLE whales AS SELECT balance FROM balances WHERE balance > 1000;"
  ],
  "inputs": [
    {"topic": "balances", "key": "account-1", "value": "1001"},
    {"topic": "balances", "key": "account-1", "value": "999"}
  ],
  "outputs": [
    {"topic": "whales", "key": "account-1", "value": "1001"},
    {"topic": "whales", "key": "account-1", "value": null}
  ]
}

Actual behavior

I'm wondering why in the CLI, the SELECT and PRINT statements do not print the null value tombstone:

ksql> SELECT * FROM whales;
1571349169547 | account-1 | 1001

Press CTRL-C to interrupt
ksql> PRINT whales FROM BEGINNING;
10/17/19 9:52:49 PM UTC , account-1 , 1001

Press CTRL-C to interrupt

Expected behavior

I was expecting to see the tombstone null in there, to indicate the removal of account-1 from the WHALES table. Intuitively, anything consuming the table would need to get those updates. Playing around in the CLI threw me off, by not printing those.

P1 bug null-handling requires-streams-change

Most helpful comment

I'm with @naartjie here - if a user is to use SELECT * FROM WHALES EMIT CHANGES to materialize the table in-memory, then they'll need the tombstones!

Likewise, a user issuing the same query in the CLI will currently see INSERTs and UPDATEs to the table, but not DELETEs - this too is wrong.

This needs fixing. However, it depends on / is related to https://github.com/confluentinc/ksql/issues/5303, as we'll always need the key to know _which_ row has been deleted. Hence moving to next milestone.

All 15 comments

I don't think this should be a bug, maybe more like "unintuitive behaviour", however I am not able to remove the label now.

I think I agree with you that this is a bug for the PRINT functionality, which should print exactly what is in the topic. SELECT, however, should only print what is in the table (tombstones are not in the table).

Before I "fix" this, I want to get opinions of some other people cc @rmoff @vcrfxia (because of #3470).

I think showing nulls in PRINT makes sense as long as we're consistent about it (regardless of serialization format). I'm not convinced we shouldn't show nulls for SELECT queries on tables, though. Isn't it important to know that a key has been removed from the table, if SELECT is meant to illustrate a changelog for the table?

Isn't it important to know that a key has been removed from the table, if SELECT is meant to illustrate a changelog for the table?

Would this come down to whether EMIT CHANGESwas specified in the SELECT or not?

Would this come down to whether EMIT CHANGES was specified in the SELECT or not?

Yes, I was assuming that this was the case. (I thought this was an old GitHub issue but I now realize it was opened just yesterday.) I am in agreement that if SELECT on a table is issued without EMIT CHANGES, then the output should not contain tombstones.

I'm still unsure/confused as to how one could "track" the removal of a key from a table, if the tombstone is not sent.

If EMIT CHANGES solves this, than that's happy times, but do any versions of KSQL support EMIT CHANGES today? Is it behind a feature flag? I tried using it but 5.3.0 and 5.4.0-beta1 didn't seem to like it.

BTW: I have only tried to do a PoC using the CLI, and I'm assuming the KSQL REST API would behave the same. Hence this issue: how to track the removal of a key (account number) from a table which is filtered by some condition, as in the initial example test case.

I tried running docker confluentinc/cp-ksql-server:5.4.0-dist-1:

SELECT * FROM WHALES EMIT CHANGES;

Does not output the tombstone when an account balance falls to 1000 or less.

What is the consensus, should it be output?

I'm with @naartjie here - if a user is to use SELECT * FROM WHALES EMIT CHANGES to materialize the table in-memory, then they'll need the tombstones!

Likewise, a user issuing the same query in the CLI will currently see INSERTs and UPDATEs to the table, but not DELETEs - this too is wrong.

This needs fixing. However, it depends on / is related to https://github.com/confluentinc/ksql/issues/5303, as we'll always need the key to know _which_ row has been deleted. Hence moving to next milestone.

Thank you for confirming @big-andy-coates, after logging this issue, I wasn't quite sure how it was received.

I've looked into this and found that it's not as simple as removing the if (value == null) return; block from the code in TransientBlockingQueue that's filtering out the tombstones :(. Dang and blast it! ;)

Remove the filter can result in many more tombstones than expected. Consider a future push query test that might test tombstones are being correctly sent:

{
      "name": "should send tombstones",
      "statements": [
        "CREATE TABLE INPUT (ID BIGINT PRIMARY KEY, V0 INT) WITH (kafka_topic='test_topic', value_format='JSON');",
        "SELECT * FROM INPUT WHERE ID = 11 EMIT CHANGES LIMIT 2;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 10, "value": {"v0": 100}},
        {"topic": "test_topic", "key": 11, "value": {"v0": 200}},
        {"topic": "test_topic", "key": 11, "value": null}
      ],
      "responses": [
        {"admin": {"@type": "currentStatus"}},
        {"query": [
          {"header":{"schema":"`ID` BIGINT, `V0` INTEGER"}},
          {"row":{"keys":[11], "values":[200]}},
          {"row":{"keys":[11], "values":null}},
          {"finalMessage":"Limit Reached"}
        ]}
      ]
}

The test feeds in three rows and expects the two rows out with the matching ID. Unfortunately, the first row out will be:

{"row":{"keys":[10], "values":null}}

What? But I'm not interested in rows with ID of 10!!

The issue here is that the WHERE criteria is implemented using a KTable::filter call, which is stateless, and hence does not know there hasn't been a previous value that did pass the filter, and so it must emit a tombstone to clear any previous output row.

This is the same underlying issue as https://github.com/confluentinc/ksql/issues/3558, and requires a change in streams to resolve.

Without the streams change the client would be inundated with tombstones for anything that doesn't match the WHERE or HAVING clause, which isn't workable. Hence marking as 'required stream change'.

Hey @big-andy-coates , is there an Apache Kafka ticket tracking the change we need in Streams?

Hey @big-andy-coates , is there an Apache Kafka ticket tracking the change we need in Streams?

Will add one.

Upstream Kafka Streams work for this is now complete: https://github.com/apache/kafka/pull/9156

FYI, PRINT TOPIC does now show tombstones.

ksql> create stream foo (ID INT KEY, VAL INT) with (kafka_topic='foo', value_format='JSON', partitions=1);
ksql> create table bar as select ID, SUM(VAL) from foo group by id having sum(val) > 0;
ksql> insert into foo values (1, 1);
ksql> insert into foo values (1, -2);
ksql> print BAR from beginning limit 2;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/10/12 20:24:41.522 Z, key: 1, value: {"KSQL_COL_0":1}
rowtime: 2020/10/12 20:24:54.307 Z, key: 1, value: <null>
Topic printing ceased

Looking into straight up selects...

Was this page helpful?
0 / 5 - 0 ratings