Provide details of the setup you're running
I am running ksqldb version 0.11.0. The Kafka broker version underneath is 2.4.1 (from AWS MSK).
Outline your question
I am trying to perform a join between two streams. Our data format in the Kafka topic is nested json, with no key. Here are the steps that I take:
CREATE STREAM topic-stream (payload STRUCT<sf1 STRUCT<sf2 STRUCT<sf3 STRUCT<nestedField1 VARCHAR,nestedField2 VARCHAR, nestedField3 VARCHAR>>>>) WITH (KAFKA_TOPIC = 'topic-name',PARTITIONS = 1, REPLICAS = 1,VALUE_FORMAT= 'JSON');
CREATE STREAM stream1 AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 = '{value1}' ;
CREATE STREAM stream2 AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 = '{value2}' ;
CREATE STREAM joinStream AS SELECT stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3 GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;
Im able to execute steps 1 and 2 with no issue, but i get an error on step 3 :
The projection contains no value columns.
If i remove the groupBy and Having statements, as well as add in the rowkey column , I have this request:
CREATE STREAM joinStream AS SELECT rowkey, stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3
which is successful.
Any ideas on what I could be doing wrong here or what is the issue?
Hello @eshil-patel when we examine this query:
CREATE STREAM joinStream
AS SELECT
stream1.payload->sf1->sf2->sf3->nestedField3
FROM stream1 LEFT JOIN stream2
WITHIN 10 seconds ON
stream1.payload->sf1->sf2->sf3->nestedField3 =
stream2.payload->sf1->sf2->sf3->nestedField3
GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3
HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;
The key for the output is stream1.payload->sf1->sf2->sf3->nestedField3 (which is determined by the GROUP BY clause). That means that when you select _only_ that field and no other, we don't consider that query to have any value columns - it only has "key" columns. If you want a quick workaround, you can add the following to your select:
AS SELECT
stream1.payload->sf1->sf2->sf3->nestedField3,
AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue
This is different behavior from standard SQL, which doesn't differentiate between key and value columns. If you want the full details, I recommend you skim this blog: https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/ (specifically the section on aggregations - https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/#maintaining-compatibility-aggregations).
Hi @agavra , I've tried these two queries following your recommendation
CREATE STREAM joinStream
AS SELECT
stream1.payload->sf1->sf2->sf3->nestedField3 ,
AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue
FROM stream1 LEFT JOIN stream2
WITHIN 10 seconds ON
stream1.payload->sf1->sf2->sf3->nestedField3 =
stream2.payload->sf1->sf2->sf3->nestedField3
GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3
HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;
This got me this error:
Non-aggregate SELECT expression(s) not part of GROUP BY: AS_VALUE(stream1_PAYLOAD->sf1->sf2->sf3->nestedField3). Either add the column to the GROUP BY or remove it from the SELECT.
When I added it to the groupBy, this is the message I got:
The projection contains no value columns
Im assuming when I add it into the group by, it does what happened originally, and has only one key column. Any ideas whats happening in the first situation ?
Ah it looks like you're running into a bug that will be fixed in the next release https://github.com/confluentinc/ksql/issues/5906 and https://github.com/confluentinc/ksql/issues/5967 - we had a bug with AS_VALUE when used with a grouping clause that was a struct dereference.
You can add a aggregate column (e.g. COUNT(*)) instead into the select as a quick workaround.
@agavra Thanks for that explanation and insight, that answers my question about the query.