To reproduce, see here.
Using a Kafka topic populated by Kafka Connect and Debezium, it is possible to build a KSQL table that reflects the current state of an RDBMS table. Usually a SELECT against the KSQL table will show the current state, and if the SELECT is left running any changes to the RDBMS table (and thus underlying Kafka topic) will be emitted from the KSQL table SELECT.
However, it has been observed (at 44:36 here) that in running a SELECT, with no changes to the source data made, two rows are emitted from the table.
Step by step - _all works as expected until step 8_
Spin up environment, which configures Debezium and Kafka Connect etc
Create KSQL stream over source topic, change partition key (because it will be used in a subsequent join), create table over re-keyed stream
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM ACCOUNTS_STREAM WITH (KAFKA_TOPIC='asgard.demo.accounts', VALUE_FORMAT='AVRO');
CREATE STREAM ACCOUNTS_REKEYED WITH (PARTITIONS=1) AS SELECT * FROM ACCOUNTS_STREAM PARTITION BY ACCOUNT_ID;
-- This select statement is simply to make sure that we have time for the ACCOUNTS_REKEYED topic
-- to be created before we define a table against it
SELECT * FROM ACCOUNTS_REKEYED LIMIT 1;
CREATE TABLE ACCOUNTS WITH (KAFKA_TOPIC='ACCOUNTS_REKEYED',VALUE_FORMAT='AVRO',KEY='ACCOUNT_ID');
From KSQL, select the table value for key a42. Do not cancel the query.
ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
In MySQL, update the EMAIL value:
mysql> UPDATE accounts SET EMAIL='none' WHERE ACCOUNT_ID='a42';
Query OK, 1 row affected (0.03 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Note that in KSQL a new row is emitted from the KSQL table. This is as expected.
a42 | Robin | Moffatt | none | +44 123 456 789
Recap: in KSQL the table has shown the initial state, and then when it changed, the new state:
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
a42 | Robin | Moffatt | none | +44 123 456 789
Cancel the KSQL table query, and rerun it (do not cancel it once run).
Just the current state (EMAIL='none') is returned. This is expected:
^CQuery terminated
ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
a42 | Robin | Moffatt | none | +44 123 456 789
Change the EMAIL value again in MySQL:
mysql> UPDATE accounts SET EMAIL='[email protected]' WHERE ACCOUNT_ID='a42';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Note that in KSQL a new row is emitted from the KSQL table. This is as expected.
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
_This is where the divergence between actual and expected behaviour happened._
As before, cancel the KSQL table query and rerun it.
Expected behaviour: a single row is emitted, that of the current state (EMAIL='[email protected]')
^CQuery terminated
ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
This is what has always been observed previously, and observed now when trying to reproduce it
Observed behaviour
^CQuery terminated
ksql> SELECT ACCOUNT_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE FROM ACCOUNTS WHERE ACCOUNT_ID='a42';
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
a42 | Robin | Moffatt | [email protected] | +44 123 456 789
There is a pause of a few seconds between the two rows being emitted.
Two things are strange here:
Why are there two rows emitted, even though the state hasn't changed?
The [email protected] value had already been received and emitted, per step 7 - it is not the case therefore that it was only received whilst this SELECT was running.
EMAIL='[email protected]') and third (EMAIL='[email protected]') values of three total - so it's not the case that the table was happening to show the prior value and then the new one. It showed the very first, and then the last, omitting the second oneThis is due to the way Tables are implemented in KSQL. In KSQL Table refers to the changelog topic and the corresponding internal state store is not visible to the user. We always read the changelog topic for a table from the beginning when we run a query on it. So w hen you run a simple select statement that only includes filter and projection operations on the table, KSQL will start reading from the beginning of the changelog topic and builds a ktable and applies the projection and filter on it and then emit the result. Note that you can control the rate of emit by configuring the buffer and commit interval for the query using streams config values (In KSQL you use SET command for this.).
Depending on the buffer size and commit interval you will see _some_ of the intermediate values for a key but eventually you will have the final value for each key emitted. If you terminate the query and run it again, it will again start from the beginning of the changelog topic for the table. You may see different intermediate values for a key but eventually at the end you will see the final value for each key.
Thanks @hjafarpour. What are the specific config items I can change to get KSQL to not emit the intermediate results?
Thanks @hjafarpour. In that case, referring to @rmoff's 2nd question, we should see 3 results. But he is seeing only 2 results (first, third), second (setting email to null) is not emitted.
The docker image from Confluent doesn't set an important property: ksql.streams.cache.max.bytes.buffering
So, set this property for in ksql: Here is the link to the doc: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#ksql-streams-cache-max-bytes-buffering
Then step 8 shall work as expected in that the SELECT query shall only emit the single current record instead of the history of the record.
For more information on this KSQL property, read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html
Most helpful comment
The docker image from Confluent doesn't set an important property: ksql.streams.cache.max.bytes.buffering
So, set this property for in ksql: Here is the link to the doc: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#ksql-streams-cache-max-bytes-buffering
Then step 8 shall work as expected in that the SELECT query shall only emit the single current record instead of the history of the record.
For more information on this KSQL property, read this: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html