Ksql: CP 5.5 JOIN, GROUP BY and WINDOW do not work together

Created on 29 Jul 2020  路  13Comments  路  Source: confluentinc/ksql

Describe the bug
Seems that we have a regression in CP 5.5. The following query runs in CP 5.4 but I get the attached error when I use CP 5.5:

CREATE TABLE test AS SELECT 
   p.userid, 
   COUNT(*) as t_count 
FROM  PAGEVIEWS_ORIGINAL p 
LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid 
WINDOW TUMBLING (SIZE 1 MINUTE) 
GROUP BY p.userid 
HAVING COUNT(*) > 2;

To Reproduce
Run the above query in the Pageview demo context on both CP 5.4 and CP 5.5.1

Expected behavior
On CP 5.4 query runs as expected.

Actual behaviour
On CP 5.5.1 the following error message is shown:

ksql> CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count
>FROM  PAGEVIEWS_ORIGINAL p
>LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid
>WINDOW TUMBLING (SIZE 1 MINUTE)
>GROUP BY p.userid
>HAVING COUNT(*) > 2;
expected one element but was: <io.confluent.ksql.analyzer.Analysis$AliasedDataSource@2b97286d, io.confluent.ksql.analyzer.Analysis$AliasedDataSource@4d7cd0ac>
ksql> 

Additional context
Add any other context about the problem here.

blocker bug

All 13 comments

Hmm, I can't repro on master. The following QTT passes:

    {
      "name": "count and having and join",
      "statements": [
        "CREATE STREAM pageviews (userID INT) WITH (kafka_topic='pageviews', value_format='JSON');",
        "CREATE TABLE users (userID INT PRIMARY KEY, col1 VARCHAR) WITH (kafka_topic='users', value_format='JSON');",
        "CREATE TABLE out AS SELECT p.userID, count(*) AS count FROM pageviews p LEFT JOIN users u ON p.userID = u.userID WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;"
      ],
      "inputs": [
        {"topic": "users", "key": 1, "value": {"col1": "one"}},
        {"topic": "pageviews", "value": {"userID": 1}},
        {"topic": "pageviews", "value": {"userID": 1}},
        {"topic": "pageviews", "value": {"userID": 1}}
      ],
      "outputs": [
        {"topic": "OUT", "key": 1, "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "OUT", "key": 1, "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "OUT", "key": 1, "value": {"COUNT": 3}, "window":  {"start": 0, "end": 60000, "type": "time"}}
      ]
    }

I will try on 5.5

hmm, can't repro on 5.5 either. The bug might be environmental - what other commands did you run?

Here are the statements I ran, the datagen was quickstart (pageview, users):


CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar)
WITH (kafka_topic='pageviews', value_format='json');

CREATE TABLE users_original(registertime bigint, userid varchar, regionid varchar, gender varchar)
 WITH (kafka_topic='users', value_format='json', key = 'userid');


CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count FROM  PAGEVIEWS_ORIGINAL p LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;

I use CP 5.5.1, both server and CLI.

OK confirmed that I can reproduce this on 5.5.1 but not on master. I'll triage the impact and report back :)

    {
      "name": "count and having and join",
      "statements": [
        "CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='json');",
        "CREATE TABLE users_original(registertime bigint, userid varchar, regionid varchar, gender varchar) WITH (kafka_topic='users', value_format='json', key='userid');",
        "CREATE TABLE test AS SELECT p.userid, COUNT(*) as t_count FROM  PAGEVIEWS_ORIGINAL p LEFT JOIN USERS_ORIGINAL u ON p.userid=u.userid WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY p.userid HAVING COUNT(*) > 2;"
      ],
      "inputs": [
        {"topic": "users", "key": "1", "value": {"regionid": "one"}},
        {"topic": "pageviews", "value": {"userID": "1"}},
        {"topic": "pageviews", "value": {"userID": "1"}},
        {"topic": "pageviews", "value": {"userID": "1"}}
      ],
      "outputs": [
        {"topic": "TEST", "key": "1", "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "TEST", "key": "1", "value": null, "window": {"start": 0, "end": 60000, "type": "time"}},
        {"topic": "TEST", "key": "1", "value": {"COUNT": 3}, "window":  {"start": 0, "end": 60000, "type": "time"}}
      ]
    }

The bug happens here (QueryAnalyzer:L261):

    final AliasedDataSource source = Iterables.getOnlyElement(analysis.getFromDataSources());

It expects there to only be one source in the group by, but there may actually be multiple in the case of a join.

Fixing this is non-trivial if we want to be able to also select WINDOWSTART and WINDOWEND. A simple fix accounts for being able to issue the query in the original issue, but the below test is more comprehensive:

    {
      "name": "aggregate windowed join",
      "statements": [
        "CREATE STREAM A (ROWKEY VARCHAR KEY, col1 VARCHAR) WITH (kafka_topic='a', value_format='JSON');",
        "CREATE TABLE B (ROWKEY VARCHAR KEY, col1 VARCHAR) WITH (kafka_topic='b', value_format='JSON');",
        "CREATE TABLE C AS SELECT A.ROWKEY, collect_list(A.COL1), collect_list(B.COL1), WINDOWSTART as WSTART, WINDOWEND AS WEND FROM A JOIN B on A.ROWKEY = B.ROWKEY WINDOW TUMBLING (SIZE 10 MILLISECONDS) GROUP BY a.ROWKEY;"
      ],
      "inputs": [
        {"topic": "b", "key": "1", "value": {"col1": "B1"}},
        {"topic": "a", "key": "1", "value": {"col1": "A1"}},
        {"topic": "a", "key": "1", "value": {"col1": "A2"}},
        {"topic": "b", "key": "1", "value": {"col1": "B2"}},
        {"topic": "a", "key": "1", "value": {"col1": "A3"}},
        {"topic": "a", "key": "1", "value": {"col1": "A4"}, "timestamp": 12}
      ],
      "outputs": [
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1"], "KSQL_COL_2": ["B1"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1", "A2"], "KSQL_COL_2": ["B1", "B1"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A1", "A2", "A3"], "KSQL_COL_2": ["B1", "B1", "B2"]}, "window": {"start": 0, "end": 10, "type": "time"}},
        {"topic": "C", "key": "1", "value": {"A_ROWKEY": "1", "KSQL_COL_1": ["A4"], "KSQL_COL_2": ["B2"]}, "window": {"start": 10, "end": 20, "type": "time"}}
      ]
    }

Note: the output here isn't correct - the window bounds should be there but I was going to have them added in later

@agavra just to confirm, but this is no longer an issue on master, and the ask is just to fix it on CP 5.5 / ksql 0.7?

I believe so, I haven't tried the full QTT test (including selecting window bounds in the select) but Hojjat's example works on master.

Humm... let me take a look.

The test case you added @agavra, where the window bounds are in the projection doesn't work on master, but also doesn't work on 5.5 or 5.4. So this is not a regression, just a bug. Breaking out into new bug: #5931.

The version without the window bounds in the projection does work in 5.4, 6.0 and master, and fails in 5.5 (as @agavra says above).

Fixed in upcoming 5.5.2 and 6.0.0 releases and 0.12 community release.

Was this page helpful?
0 / 5 - 0 ratings