Describe the bug
When I set the auto.offset.reset to earliest and run a windowed query, I get WINDOWSTART times corresponding to current system times
To Reproduce
Steps to reproduce the behavior, include:
SELECT ticker, WINDOWSTART(), COUNT(*) AS ‘ticks’
FROM prices
WINDOW TUMBLING( SIZE 60 SECONDS )
WHERE ticker = ‘AAPL’
GROUP BY ticker
EMIT CHANGES;
SET 'auto.offset.reset' = 'earliest';
SELECT ticker, WINDOWSTART(), COUNT(*) AS ‘ticks’
FROM prices
WINDOW TUMBLING( SIZE 60 SECONDS )
WHERE ticker = ‘AAPL’
GROUP BY ticker
EMIT CHANGES;
Expected behavior
I would expect that the second select would generate WINDOWSTART times reflecting the ROWTIME from the first messages in the topic.
Actual behaviour
A clear and concise description of what actually happens, including:
The WINDOWSTART times are actually reflect the current (system/wallclock) time. The counts are also incorrect (because they are going into the wrong bucket)
If I do something like the following I can get the right counts (because I'm effectively bypassing the windowing function) but it clearly shows the discrepancy between what I expect and the windowstart
SELECT TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm’), ticker, WINDOWSTART(), COUNT(*) AS ‘ticks’
FROM prices
WINDOW TUMBLING( SIZE 60 SECONDS )
WHERE ticker = ‘AAPL’
GROUP BY TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd HH:mm’), ticker
EMIT CHANGES;
I would expect that the second select would generate WINDOWSTART times reflecting the ROWTIME from the first messages in the topic.
This is not how it works. Windows have a "fixed alignment" (to make them deterministic). For example, tumbling windows with a size of 5000ms have predictable window boundaries [0;5000),[5000;10000),... — and not [1000;6000),[6000;11000),... or even something “random” like [1452;6452),[6452;11452),....
Hence, the window-start-timestamp is not related to the current system time.
The behavior you describe, that the window boundaries are aligned to a record timestamp, would be use in "sliding windows" -- however, "sliding windows" are currently not supported.
Does this make sense?
@bpaulsen Ditto what @mjsax said.
I'm still a little confused based on what you're expecting (some sample data with inputs and outputs would go a long way). But here's an example of how this is expected to work in practice. This uses ksqlDB 0.8.1 at the time of writing (it's kept up to date and programmatically validated).
let's say I have 2 messages every minute for the past 4 hours. If I do a select and group by the symbol ID (in this case, ticker), and also set my offset to earliest, then I'd expect the windowstart for the message from 4 hours ago to reflect a timestamp from 4 hours ago
instead, I get a windowstart reflecting now
and so, my first grouping shows about 480 messages in the first minute, because they all have a windowstart of now
I'll put in some sample data later tonight though
Sample data:
CREATE STREAM prices (ticker VARCHAR, price DOUBLE, timestamp BIGINT)
WITH (kafka_topic='prices',
key='ticker',
timestamp='timestamp',
partitions=8,
value_format='avro');
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587368476000);
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587368476001);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587368476002);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587368476003);
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587368536000);
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587368536001);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587368536002);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587368536003);
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587433336000);
INSERT INTO prices (ticker, price, timestamp) VALUES ('AAPL', 8.2, 1587433336001);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587433336002);
INSERT INTO prices (ticker, price, timestamp) VALUES ('GOOG', 8.2, 1587433336003);
CREATE STREAM prices2
WITH ( KAFKA_TOPIC='prices',
VALUE_FORMAT='Avro',
TIMESTAMP = 'timestamp'
);
Run these two queries:
QUERY 1
select ticker, windowstart(), count(*) as ticks
from prices
window tumbling( size 60 second )
where ticker = 'AAPL'
group by ticker
emit changes;
QUERY 2
select ticker, windowstart(), count(*) as ticks
from prices2
window tumbling( size 60 second )
where ticker = 'AAPL'
group by ticker
emit changes;
I would expect these to have identical outputs. (prices2 represents a topic that is created outside of KSQL). Instead I get these results
QUERY 1 RESULTS
+--------------------------+--------------------------+--------------------------+
|TICKER |KSQL_COL_1 |TICKS |
+--------------------------+--------------------------+--------------------------+
|AAPL |1587368460000 |2 |
|AAPL |1587368520000 |2 |
|AAPL |1587433320000 |2 |
QUERY 2 RESULTS
+--------------------------+--------------------------+--------------------------+
|TICKER |KSQL_COL_1 |TICKS |
+--------------------------+--------------------------+--------------------------+
|AAPL |1587435840000 |6 |
I'll put in some sample data later tonight though
Perfect. A small working example would be super helpful to reproduce the issue. There is a lot of testing in place to getting an example that helps to reproduce the issue would be the quickest way to get to the bottom of this.
Just posted
On Apr 20, 2020, at 9:59 PM, Matthias J. Sax notifications@github.com wrote:
I'll put in some sample data later tonight though
Perfect. A small working example would be super helpful to reproduce the issue. There is a lot of testing in place to getting an example that helps to reproduce the issue would be the quickest way to get to the bottom of this.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub https://github.com/confluentinc/ksql/issues/5121#issuecomment-616904636, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACBGWSCYI6TEB2THESEZM3TRNT4YDANCNFSM4MM2EYWA.
Both queries in your post are the same? Some c&p error?
yes, c&p error. Fixed now, and output is easier to read
BTW, prices2 is meant to represent a topic created outside of KSQL. It seems like the timestamp field is not being respected. With that being said, the rowtime of the actual topic (in my real-life topic) matches within a few milliseconds of the timestamp field.
Is the source topic configured with message.timestamp.type=LogAppendTime (or log.message.timestamp.type=LogAppendTime as a default at the broker level)? I've seen a similar issue occur in the past in some versions when this is the case.
If so, one possible solution is to configure the following property in ksql-server.properties: ksql.streams.topic.message.timestamp.type=CreateTime
Note: this behavior is likely related to https://issues.apache.org/jira/browse/KAFKA-6614, and should be resolved as of ksqlDB 0.7+ and the upcoming CP 5.5.0 release of ksqlDB.
I can confirm that log.message.timestamp.type was change to LogAppendTime at the broker level. Using the ksql-server.properties fixed the issue
As far as I'm concerned, we can consider this closed. Should I be the one closing the issue?
Great, thanks for helping find the root cause @mikebin!
Great find @mikebin!