cc @MichaelDrogalis
Describe the bug
In working with historical ordering data, there is a need to process events that "happened" (according to event-time semantics) several years ago. These events can be abstracted with streams and tables using CS / CT that understand this. When it comes to joining these abstractions using CSAS (and probably CTAS), it _seems_ like a new event time of "now" can be declared, but the new events are actually timestamped with a value from one of the sources, meaning multi-year / infinite retention is required to keep the data for more than a few minutes, depending on log cleanup settings.
To Reproduce
Using ksqldb 0.6:
with a table of users that converts registration datetime to event_ts:
Name : KT_USERS
Type : TABLE
Key field : ID
Key format : STRING
Timestamp field : EVENT_TS
Value format : AVRO
Kafka topic : KT_USERS (partitions: 3, replication: 3)
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | VARCHAR(STRING)
EVENT_TS | BIGINT
REGISTER_DATE | VARCHAR(STRING)
---------------------------------------------
e.g
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|ROWTIME |ROWKEY |ID |EVENT_TS |REGISTER_DATE |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|1429102214000 |100595 |100595 |1429102214000 |2015-04-15 22:50:14 |
and a stream of orders (similarly configured but with order datetime filling event_ts):
Name : KS_ORDERS
Type : STREAM
Key field : PATIENT_ID
Key format : STRING
Timestamp field : EVENT_TS
Value format : AVRO
Kafka topic : KS_ORDERS (partitions: 3, replication: 3)
Field | Type
----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ORDER_ID | VARCHAR(STRING)
EVENT_TS | BIGINT
PATIENT_ID | VARCHAR(STRING)
ORDER_DATE | VARCHAR(STRING)
AMOUNT_TOTAL | DOUBLE
----------------------------------------------
e.g
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|ROWTIME |ROWKEY |ORDER_ID |EVENT_TS |ORDER_DATE |PATIENT_ID |AMOUNT_TOTAL |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|1429290276000 |100595 |93 |1429290276000 |2015-04-18 03:04:36 |100595 |82.5 |
|1437927353000 |100595 |660 |1437927353000 |2015-07-27 02:15:53 |100595 |72.9 |
|1449314559000 |100595 |2121 |1449314559000 |2015-12-05 22:22:39 |100595 |72.88 |
creating a new stream that uses unix_timestamp() for the derived stream's timestamp:
CREATE STREAM KS_TEST_ORDERS_WITH_PATIENT WITH (TIMESTAMP='EVENT_TS') AS SELECT UNIX_TIMESTAMP() AS EVENT_TS, O.ORDER_ID, O.EVENT_TS AS ORDER_TS, O.ORDER_DATE, O.PATIENT_ID, P.EVENT_TS AS REGISTER_TS, P.REGISTER_DATE FROM KS_ORDERS O JOIN KT_USERS P ON O.PATIENT_ID = P.ID;
the new events appear to have a ROWTIME of "now":
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|ROWTIME |ROWKEY |EVENT_TS |ORDER_ID |ORDER_TS |ORDER_DATE |PATIENT_ID |REGISTER_TS |REGISTER_DATE |
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|1579477277958 |100595 |1579477277958 |93 |1429290276000 |2015-04-18 03:04:36 |100595 |1429102214000 |2015-04-15 22:50:14 |
|1579477277963 |100595 |1579477277963 |660 |1437927353000 |2015-07-27 02:15:53 |100595 |1429102214000 |2015-04-15 22:50:14 |
|1579477277985 |100595 |1579477277985 |2121 |1449314559000 |2015-12-05 22:22:39 |100595 |1429102214000 |2015-04-15 22:50:14 |
|1579477278005 |100595 |1579477278005 |1483 |1444996150000 |2015-10-16 22:49:10 |100595 |1429102214000 |2015-04-15 22:50:14 |
|1579477290707 |100595 |1579477290707 |93 |1429290275000 |2015-04-18 03:04:35 |100595 |1429102214000 |2015-04-15 22:50:14 |
but the brokers don't agree, and the events are quickly cleaned up due to log retention breach:
[2020-01-19 23:42:59,809] INFO [Log partition=KS_TEST_ORDERS_WITH_PATIENT-0, dir=/opt/kafka/data-0/logs] Found deletable segments with base offsets [0,31,508,1144,1621,1939,2416,2734,3052,3370,3688,4006,4324,4801,5119,5596,5914,6232,6391,6709,6946,7582,8059,8536,9013,9490,9967,11080,11398,12352,13624,14101,15373,16291,16768,17246,17723,18518,19154,20108,21064,21700,22330,23592,24381,24854,25326,25963,26437,27538,28011,29272,30061,30377,31322,32268,33469,33940,34570,35589,36378,37324,38426,39373,40003,40951,41584,42055,43476,44560,45824,46454,47086,47872,49610,50555,51345,52765] due to retention time 604800000ms breach (kafka.log.Log)
[2020-01-19 23:42:59,818] INFO [Log partition=KS_TEST_ORDERS_WITH_PATIENT-1, dir=/opt/kafka/data-0/logs] Found deletable segments with base offsets [0,13,172,331,490,649,808,967,1126,1285,1444,1603,1783,2101,2578,3055,3532,4009,4327,5122,5758,6394,6871,7666,8302,8779,9415,10051,10493,11129,11765,12242,13037,13789,14427,15224,16017,16649,17439,18069,18385,19174,19686,20316,21259,21574,22362,23309,24099,24886,25529,26633,27579,28210,28842,29786,30731,31834,32466,33253,34200,35304,36093,36722,38020,39284,40071,40861,41491,42279,43225,44329,45117,45907,47642] due to retention time 604800000ms breach (kafka.log.Log)
Expected behavior
Events generated with an explicit event timestamp should both return that value for their ROWTIME (which happens) and use that value for log cleanup management (which doesn't seem to happen)
Additional context
Similar concepts & ideas being discussed in #836
Thanks for reporting @terryf82! @apurvam I marked this as a blocker for the next release if we don't think it's too late in the cycle.
Hmm. this seems like a bug. @spena can you take a look. It should be fairly easy to reproduce.
You're right @apurvam. This is very easy to reproduce. I used the same rows described in the issue, and when I create the sink topic with the CSAS, the rows were there temporary but then removed by Kafka because of the retention time.
https://github.com/confluentinc/ksql/issues/836 is discussing changing the semantics for WITH TIMESTAMP, which might cover the fix for this bug. I'll continue discussing this issue and find a solution for it.
Fairly certain this confused the 'ell out of me too. When I asked about it, it was stated this is intended behaviour.
Given:
CREATE STREAM KS_TEST_ORDERS_WITH_PATIENT WITH (TIMESTAMP='EVENT_TS') AS SELECT UNIX_TIMESTAMP() AS EVENT_TS, O.ORDER_ID, O.EVENT_TS AS ORDER_TS, O.ORDER_DATE, O.PATIENT_ID, P.EVENT_TS AS REGISTER_TS, P.REGISTER_DATE FROM KS_ORDERS O JOIN KT_USERS P ON O.PATIENT_ID = P.ID;
The WITH(TIMESTAMP='EVENT_TS') does not affect the timestamp of KS_TEST_ORDERS_WITH_PATIENT. It only affects the timestamp of sources derived from KS_TEST_ORDERS_WITH_PATIENT.
To maybe help explain this, let's consider a example where we have a topic containing an event stream. The timestamp on the message/row is the time the message was produced to Kafka. The message contains a field containing the timestamp of when the event actually occurred, e.g. a record may have a record timestamp of a few seconds ago, but an event time of a week ago.
When we import this into KSQL we can do so using:
CREATE STREAM EVENTS (..., EVENT_TS BIGINT, ... ) WITH (TIMESTAMP='EVENT_TS', ...);
Note: as described above, the data in the topic backing EVENTS has different values for the records timestamp and the EVENT_TS field and so far setting the TIMESTAMP to EVENT_TS hasn't actually done anything.
Now, we come to _use_ EVENTS as a source:
CREATE STREAM PROCESSED_EVENTS AS SELECT ... FROM EVENTS;
When KSQL reads each record/row from the events topic it will read EVENT_TS and use that, rather than the record's timestamp, to populate ROWTIME. When KSQL serializes the rows to the PROCESSED_EVENTS topic the value in ROWTIME is used to set the timestamp on the record.
Note: the record timestamp in the PROCESSED_EVENTS topic will match the EVENT_TS timestamp.
Now let's think about your scenario, where you're providing a different TIMESTAMP in the WITH clause:
CREATE STREAM PROCESSED_EVENTS WITH(TIMESTAMP='EVENT_TS2') AS SELECT UNIX_TIMESTAMP() AS EVENT_TS2, ... FROM EVENTS;
Just like in the CREATE STREAM EVENTS statement the WITH(TIMESTAMP) clause has no effect on how PROCESSED_EVENTS will be serialized, only on how downstream sinks will be serialized.
Yes, I know this is _highly_ confusing and unintuitive! (Hence one of my first github issues I raised was: https://github.com/confluentinc/ksql/issues/836).
I actually think this is bad behaviour. In general, things in the WITH clause of a CREATE STREAM or CREATE TABLE statement affect the source (topic), and things in the WITH clause of a CREATE STREAM AS SELECT or CREATE TABLE AS SELECT affect the sink (topic), e.g. formats. TIMESTAMP breaks from this pattern.
Good question!
If you were to duplicate KS_TEST_ORDERS_WITH_PATIENT into another stream:
CREATE STREAM AS KS_TEST_ORDERS_WITH_PATIENT_2 AS SELECT * FROM KS_TEST_ORDERS_WITH_PATIENT;
Then the data in KS_TEST_ORDERS_WITH_PATIENT_2 would have the correct rowtime/record.timestamp.
Of course, there's still a chance the broker will delete your data before the above duplication gets a chance to run in KSQL. You can mitigate this by configuring the KS_TEST_ORDERS_WITH_PATIENT topic to delete based on size, not time, i.e. by setting retention.bytes to some suitably large value to allow KSQL to process the data.
However, the true fix is probably to make WITH(TIMESTAMP) in your CREATE TABLE AS SELECT statement do what you thought it did.
We should probably close this as a duplicate of #836
Closing as duplicate now that #836 is being prioritised.
Most helpful comment
Fairly certain this confused the 'ell out of me too. When I asked about it, it was stated this is intended behaviour.
Given:
The
WITH(TIMESTAMP='EVENT_TS')does not affect the timestamp ofKS_TEST_ORDERS_WITH_PATIENT. It only affects the timestamp of sources derived fromKS_TEST_ORDERS_WITH_PATIENT.To maybe help explain this, let's consider a example where we have a topic containing an event stream. The timestamp on the message/row is the time the message was produced to Kafka. The message contains a field containing the timestamp of when the event actually occurred, e.g. a record may have a record timestamp of a few seconds ago, but an event time of a week ago.
When we import this into KSQL we can do so using:
Note: as described above, the data in the topic backing
EVENTShas different values for the records timestamp and theEVENT_TSfield and so far setting theTIMESTAMPtoEVENT_TShasn't actually done anything.Now, we come to _use_
EVENTSas a source:When KSQL reads each record/row from the events topic it will read
EVENT_TSand use that, rather than the record's timestamp, to populateROWTIME. When KSQL serializes the rows to thePROCESSED_EVENTStopic the value inROWTIMEis used to set the timestamp on the record.Note: the record timestamp in the
PROCESSED_EVENTStopic will match theEVENT_TStimestamp.Now let's think about your scenario, where you're providing a different
TIMESTAMPin theWITHclause:Just like in the
CREATE STREAM EVENTSstatement theWITH(TIMESTAMP)clause has no effect on howPROCESSED_EVENTSwill be serialized, only on how downstream sinks will be serialized.Yes, I know this is _highly_ confusing and unintuitive! (Hence one of my first github issues I raised was: https://github.com/confluentinc/ksql/issues/836).
I actually think this is bad behaviour. In general, things in the
WITHclause of aCREATE STREAMorCREATE TABLEstatement affect the source (topic), and things in theWITHclause of aCREATE STREAM AS SELECTorCREATE TABLE AS SELECTaffect the sink (topic), e.g. formats.TIMESTAMPbreaks from this pattern.Where does this leave me?
Good question!
If you were to duplicate
KS_TEST_ORDERS_WITH_PATIENTinto another stream:Then the data in
KS_TEST_ORDERS_WITH_PATIENT_2would have the correct rowtime/record.timestamp.Of course, there's still a chance the broker will delete your data before the above duplication gets a chance to run in KSQL. You can mitigate this by configuring the
KS_TEST_ORDERS_WITH_PATIENTtopic to delete based on size, not time, i.e. by settingretention.bytesto some suitably large value to allow KSQL to process the data.However, the true fix is probably to make
WITH(TIMESTAMP)in yourCREATE TABLE AS SELECTstatement do what you thought it did.