Ksql: Understanding the error message outputted by ksql

Created on 25 Sep 2020  路  4Comments  路  Source: confluentinc/ksql

Provide details of the setup you're running
I am running ksqldb version 0.11.0. The Kafka broker version underneath is 2.4.1 (from AWS MSK).

Outline your question
I am trying to perform a join between two streams. Our data format in the Kafka topic is nested json, with no key. Here are the steps that I take:

  1. Create a stream from the original data source, with all the columns that I need
CREATE STREAM topic-stream (payload STRUCT<sf1 STRUCT<sf2 STRUCT<sf3 STRUCT<nestedField1 VARCHAR,nestedField2 VARCHAR, nestedField3 VARCHAR>>>>) WITH (KAFKA_TOPIC = 'topic-name',PARTITIONS = 1, REPLICAS = 1,VALUE_FORMAT= 'JSON'); 
  1. Create 2 streams from this stream
CREATE STREAM stream1  AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 =  '{value1}' ;
CREATE STREAM stream2  AS SELECT * FROM topic-stream WHERE payload -> sf1 -> sf2 -> sf3 -> nestedField2 =  '{value2}' ;
  1. Create a stream that joins on a common field from streams 1 and 2
CREATE STREAM joinStream AS SELECT stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3 GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

Im able to execute steps 1 and 2 with no issue, but i get an error on step 3 :

The projection contains no value columns.

If i remove the groupBy and Having statements, as well as add in the rowkey column , I have this request:

CREATE STREAM joinStream AS SELECT rowkey, stream1.payload->sf1->sf2->sf3->nestedField3 FROM stream1 LEFT JOIN stream2 WITHIN 10 seconds ON stream1.payload->sf1->sf2->sf3->nestedField3 = stream2.payload->sf1->sf2->sf3->nestedField3

which is successful.

Any ideas on what I could be doing wrong here or what is the issue?

question

All 4 comments

Hello @eshil-patel when we examine this query:

CREATE STREAM joinStream 
  AS SELECT 
     stream1.payload->sf1->sf2->sf3->nestedField3 
  FROM stream1 LEFT JOIN stream2 
  WITHIN 10 seconds ON 
      stream1.payload->sf1->sf2->sf3->nestedField3 = 
      stream2.payload->sf1->sf2->sf3->nestedField3 
  GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 
  HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

The key for the output is stream1.payload->sf1->sf2->sf3->nestedField3 (which is determined by the GROUP BY clause). That means that when you select _only_ that field and no other, we don't consider that query to have any value columns - it only has "key" columns. If you want a quick workaround, you can add the following to your select:

AS SELECT 
   stream1.payload->sf1->sf2->sf3->nestedField3,  
   AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue

This is different behavior from standard SQL, which doesn't differentiate between key and value columns. If you want the full details, I recommend you skim this blog: https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/ (specifically the section on aggregations - https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/#maintaining-compatibility-aggregations).

Hi @agavra , I've tried these two queries following your recommendation

CREATE STREAM joinStream 
  AS SELECT 
     stream1.payload->sf1->sf2->sf3->nestedField3 ,
     AS_VALUE(stream1.payload->sf1->sf2->sf3->nestedField3) AS nestedField3InValue
  FROM stream1 LEFT JOIN stream2 
  WITHIN 10 seconds ON 
      stream1.payload->sf1->sf2->sf3->nestedField3 = 
      stream2.payload->sf1->sf2->sf3->nestedField3 
  GROUP BY stream1.payload->sf1->sf2->sf3->nestedField3 
  HAVING COUNT(stream1.payload->sf1->sf2->sf3->nestedField3) = 1;

This got me this error:

Non-aggregate SELECT expression(s) not part of GROUP BY: AS_VALUE(stream1_PAYLOAD->sf1->sf2->sf3->nestedField3). Either add the column to the GROUP BY or remove it from the SELECT.

When I added it to the groupBy, this is the message I got:

The projection contains no value columns

Im assuming when I add it into the group by, it does what happened originally, and has only one key column. Any ideas whats happening in the first situation ?

Ah it looks like you're running into a bug that will be fixed in the next release https://github.com/confluentinc/ksql/issues/5906 and https://github.com/confluentinc/ksql/issues/5967 - we had a bug with AS_VALUE when used with a grouping clause that was a struct dereference.

You can add a aggregate column (e.g. COUNT(*)) instead into the select as a quick workaround.

@agavra Thanks for that explanation and insight, that answers my question about the query.

Was this page helpful?
0 / 5 - 0 ratings