Ksql: How to get window start or end time from kafka topic corresponding to KSQL windowed table?

Created on 26 Jun 2018  Â·  14Comments  Â·  Source: confluentinc/ksql

ksql> select * from DEV_MONITOR_RULE_2557_104782_233_2_TABLE;

1530008581051 | 2557 : Window{start=1530008520000 end=-} | 2557 | 2 | 1530008581051 
./bin/kafka-console-consumer --zookeeper 10.12.0.157:2181 --topic DEV_MONITOR_RULE_2557_104782_233_2_TABLE

{"HITCOUNTS":2,"TENANTID":2557,"HITTIME":1530008581051} 

I want to get window start timestamp (here is 1530008520000) from kafka-console-consumer command. How to achieve it? Thanks!

Why there are special codes in the message key when consume and print? like 2557d;���

 Table Name                                  | Kafka Topic                                 | Format | Windowed 
---------------------------------------------------------------------------------------------------------------
 DEV_MONITOR_RULE_2557_104782_233_2_TABLE    | DEV_MONITOR_RULE_2557_104782_233_2_TABLE    | JSON   | true  

ksql> select ROWKEY from DEV_MONITOR_RULE_2557_104782_233_2_TS_TABLE;
2557 : Window{start=1530008520000 end=-}   

I guess the actual message key is 2557 : Window{start=1530008520000 end=-}, right?

enhancement

Most helpful comment

Hi @ChenShuai1981 , right now there is no way to extract the window start time from the message. Your analysis is correct: the Window information is in the key, encoded in binary format. By executing a SELECT ROWKEY it is converted to the string that you see.

What we need is to write a scalar function to parse out the start time from the row key. This should be fairly easy to implement. If such a function was implemented, you would be able to do SELECT WINDOWSTARTTIME() which would return the window start time as a long.

I'll mark this as an enhancement request.

All 14 comments

Hi @ChenShuai1981 , right now there is no way to extract the window start time from the message. Your analysis is correct: the Window information is in the key, encoded in binary format. By executing a SELECT ROWKEY it is converted to the string that you see.

What we need is to write a scalar function to parse out the start time from the row key. This should be fairly easy to implement. If such a function was implemented, you would be able to do SELECT WINDOWSTARTTIME() which would return the window start time as a long.

I'll mark this as an enhancement request.

Hey @apurvam - I thought the timestamp of the output message (the one containing the aggregate) got set to the window start time - is that no longer the case?

@blueedgenick , I think the timestamp of the output message is the timestamp of the message that generated the output. Not the window start time. Will cross check and report back.

Quick test with the 'ratings' dataset from the datagen tool, on 5.0.x:

create stream ratings ( rating_id bigint, user_id int, stars int, rating_time bigint, channel varchar, message varchar) with (kafka_topic = 'ratings', value_format = 'json');

create table windowtest as select user_id, count(message) as message_count from ratings window tumbling(size 30 seconds) group by user_id;

select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss.SSS') as ts, rowkey, user_id, message_count from windowtest where user_id = 2;

2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 1
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 3
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 4
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 5
2018-06-27 00:58:00.000 | 2 : Window{start=1530061080000 end=-} | 2 | 6
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 1
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 2
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 3
2018-06-27 00:58:30.000 | 2 : Window{start=1530061110000 end=-} | 2 | 4
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 2
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 3
2018-06-27 00:59:00.000 | 2 : Window{start=1530061140000 end=-} | 2 | 4
^CQuery terminated

+1 to what @blueedgenick says, this is certainly what I've always observed and communicated externally. If this is changing please let us know :)

@hjafarpour can you help get to the bottom of this?

Currently the timestamp for the windowed table is set to the start of the window as @blueedgenick said.

The observation is correct, but this is not intended behavior but actually a bug in Kafka Streams -- we are going to fix this bug in Kafka Streams in the next release (and maybe some bug fix releases). You should not rely on the current behavior but extract the window start timestamp from the key.

Fixed in Apache Kafka trunk via: https://github.com/apache/kafka/pull/5423

@mjsax what will the message timestamp become after this fix? Which version of CP will this be in

@apurvam @hjafarpour @blueedgenick currently there's no way to extract the timestamp from the key in KSQL, is that right? If so, then this move away (whether a 'bug' or not :) ) from the message timestamp being the window start time could break things. Several demos, including our own (e.g. https://github.com/confluentinc/quickstart-demos/blob/4.1.1-post/mysql-debezium/connector_elasticsearch.config, etc) use the org.apache.kafka.connect.transforms.InsertField$Value Single Message Transform to take the message timestamp to be able to store a calculated aggregate in a target datastore such as Elasticsearch. (/cc @ybyzek )

The PR from above is for AK 2.1 / CP 5.1 -- however, we also back-ported it to AK 2.0.1 / CP 5.0.1, AK 1.1.2 / CP 4.1.2, and older releases.

With the current fix, the timestamp will be the one of the last record that updated the aggregation result. Because processing order is not guaranteed, The timestamps are assigned non-deterministic and you cannot rely on them atm. Note, in AK 2.0 / CP 5.0 there is no official contract that provides any semantic guarantees for the result record timestamps.

We have WIP to change this, and use the maximum timestamp over all records that contributed to the result record. If we can finish this work it will be part of AK 2.1 / CP 5.1 (and we cannot back-port it). If this happens, we plan to make it a public contract and semantic guarantee and will also document it accordingly. It's not guaranteed, that we can finish the work on timestamp semantics for AK 2.1 / CP 5.1 though. Thus, as long as you don't see anything in the docs, assume that there is no public contract.

Even if you cannot extract the timestamp from the key in KSQL itself, a custom SMT should be able to extract the timestamp from the key using a TimeWindowedSerde or SessionWindowedSerde.

@apurvam @hjafarpour @blueedgenick currently there's no way to extract the timestamp from the key in KSQL, is that right?

Today, there isn't a way. However, we should easily be able to add a UDF for this. I filed #1674 to track the feature request.

closing to track in #1674

Was this page helpful?
0 / 5 - 0 ratings

Related issues

hjafarpour picture hjafarpour  Â·  5Comments

apurvam picture apurvam  Â·  3Comments

devsohi picture devsohi  Â·  5Comments

Harshith2396 picture Harshith2396  Â·  4Comments

rmoff picture rmoff  Â·  3Comments