Ksql: KSQL table emits two rows even though state hasn't changed

Created on 21 Jan 2019  路  4Comments  路  Source: confluentinc/ksql

To reproduce, see here.

Using a Kafka topic populated by Kafka Connect and Debezium, it is possible to build a KSQL table that reflects the current state of an RDBMS table. Usually a SELECT against the KSQL table will show the current state, and if the SELECT is left running any changes to the RDBMS table (and thus underlying Kafka topic) will be emitted from the KSQL table SELECT.

However, it has been observed (at 44:36 here) that in running a SELECT, with no changes to the source data made, two rows are emitted from the table.

Step by step - _all works as expected until step 8_

  1. Spin up environment, which configures Debezium and Kafka Connect etc

  2. Create KSQL stream over source topic, change partition key (because it will be used in a subsequent join), create table over re-keyed stream

    SET 'auto.offset.reset' = 'earliest';
    CREATE STREAM ACCOUNTS_STREAM WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');
    CREATE STREAM ACCOUNTS_REKEYED WITH (PARTITIONS=1) AS SELECT * FROM ACCOUNTS_STREAM PARTITION BY ACCOUNT_ID;
    -- This select statement is simply to make sure that we have time for the ACCOUNTS_REKEYED topic
    -- to be created before we define a table against it
    SELECT * FROM ACCOUNTS_REKEYED LIMIT 1;
    CREATE TABLE ACCOUNTS WITH (KAFKA_TOPIC='ACCOUNTS_REKEYED',VALUE_FORMAT='AVRO',KEY='ACCOUNT_ID');
    
  3. From KSQL, select the table value for key a42. Do not cancel the query.

    ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
    a42 | Robin | Moffatt | [email protected] | +44 123 456 789
    
  4. In MySQL, update the EMAIL value:

    mysql> UPDATE accounts SET EMAIL='none' WHERE ACCOUNT_ID='a42';
    Query OK, 1 row affected (0.03 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    

    Note that in KSQL a new row is emitted from the KSQL table. This is as expected.

    a42 | Robin | Moffatt | none | +44 123 456 789
    
  5. Recap: in KSQL the table has shown the initial state, and then when it changed, the new state:

    a42 | Robin | Moffatt | [email protected] | +44 123 456 789
    a42 | Robin | Moffatt | none | +44 123 456 789
    
  6. Cancel the KSQL table query, and rerun it (do not cancel it once run).

    Just the current state (EMAIL='none') is returned. This is expected:

    ^CQuery terminated
    ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
    a42 | Robin | Moffatt | none | +44 123 456 789
    
  7. Change the EMAIL value again in MySQL:

    mysql> UPDATE accounts SET EMAIL='[email protected]' WHERE ACCOUNT_ID='a42';
    Query OK, 1 row affected (0.01 sec)
    Rows matched: 1  Changed: 1  Warnings: 0
    

    Note that in KSQL a new row is emitted from the KSQL table. This is as expected.

    a42 | Robin | Moffatt | [email protected] | +44 123 456 789
    
  8. _This is where the divergence between actual and expected behaviour happened._

    As before, cancel the KSQL table query and rerun it.

    • Expected behaviour: a single row is emitted, that of the current state (EMAIL='[email protected]')

      ^CQuery terminated
      ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
      a42 | Robin | Moffatt | [email protected] | +44 123 456 789
      

      This is what has always been observed previously, and observed now when trying to reproduce it

    • Observed behaviour

      ^CQuery terminated
      ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
      a42 | Robin | Moffatt | [email protected] | +44 123 456 789
      a42 | Robin | Moffatt | [email protected] | +44 123 456 789
      

      There is a pause of a few seconds between the two rows being emitted.

      Two things are strange here:

      1. Why are there two rows emitted, even though the state hasn't changed?

        The [email protected] value had already been received and emitted, per step 7 - it is not the case therefore that it was only received whilst this SELECT was running.

      2. The two rows emitted are the first (EMAIL='[email protected]') and third (EMAIL='[email protected]') values of three total - so it's not the case that the table was happening to show the prior value and then the new one. It showed the very first, and then the last, omitting the second one

Most helpful comment

The docker image from Confluent doesn't set an important property: ksql.streams.cache.max.bytes.buffering

So, set this property for in ksql: Here is the link to the doc: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#ksql-streams-cache-max-bytes-buffering

Then step 8 shall work as expected in that the SELECT query shall only emit the single current record instead of the history of the record.

For more information on this KSQL property, read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html

All 4 comments

This is due to the way Tables are implemented in KSQL. In KSQL Table refers to the changelog topic and the corresponding internal state store is not visible to the user. We always read the changelog topic for a table from the beginning when we run a query on it. So w hen you run a simple select statement that only includes filter and projection operations on the table, KSQL will start reading from the beginning of the changelog topic and builds a ktable and applies the projection and filter on it and then emit the result. Note that you can control the rate of emit by configuring the buffer and commit interval for the query using streams config values (In KSQL you use SET command for this.).
Depending on the buffer size and commit interval you will see _some_ of the intermediate values for a key but eventually you will have the final value for each key emitted. If you terminate the query and run it again, it will again start from the beginning of the changelog topic for the table. You may see different intermediate values for a key but eventually at the end you will see the final value for each key.

Thanks @hjafarpour. What are the specific config items I can change to get KSQL to not emit the intermediate results?

Thanks @hjafarpour. In that case, referring to @rmoff's 2nd question, we should see 3 results. But he is seeing only 2 results (first, third), second (setting email to null) is not emitted.

The docker image from Confluent doesn't set an important property: ksql.streams.cache.max.bytes.buffering

So, set this property for in ksql: Here is the link to the doc: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#ksql-streams-cache-max-bytes-buffering

Then step 8 shall work as expected in that the SELECT query shall only emit the single current record instead of the history of the record.

For more information on this KSQL property, read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html

Was this page helpful?
0 / 5 - 0 ratings