Ksql: CASE statement support in KSQL

Created on 12 Jan 2018  Â·  25Comments  Â·  Source: confluentinc/ksql

Currently, it seems that case statement is not supported by KSQL v 0.3? Is this due to be added anytime soon? What else options are they to create derived columns using KSQL?

enhancement

Most helpful comment

Use case 2

To use KSQL as a way of evolving a schema, populating new columns conditionally based on existing data (i.e. where a simple default would not be correct)

For example, existing topic has a SKU, and a "product department" column needs adding to the topic. New messages will be populated with this at source, but existing data needs to be retrospectively processed, and the "product department" can be derived based on the SKU. For example:

Product | SKU
-|-
Toaster | H1235
Kettle | H1425
Banana | F0192
Apple | F1723
Cat | x1234

CREATE STREAM PRODUCT_V1 (PRODUCT VARCHAR, SKU VARCHAR) WITH …

CREATE STREAM PRODUCT_V2 AS \
SELECT PRODUCT, SKU, CASE \
                     WHEN SKU LIKE 'H%' THEN 'Homeware' \
                     WHEN SKU LIKE 'F%' THEN 'Food' \ 
                     ELSE 'Unknown' \
                     END AS PROD_DEPT
FROM PRODUCT_V1                     

Product | SKU | Product Department
-|-|-
Toaster | H1235 | Homeware
Kettle | H1425 | Homeware
Banana | F0192 | Food
Apple | F1723 | Food
Cat | x1234 | Unknown

All 25 comments

hi @mateen-dar .. support for the CASE statement is not being worked on right now, though I think it would be a great addition in the medium term.

What use case do you have that requires it?

I wanted a custom column in my streams. Some mean of making transformations to a column in a stream

We'd use this too - we have a case where one-or-another field will be populated in the source topic JSON, and we want to coalesce those into a single field in a KSQL stream.

Use case 1

I want to use this for data cleansing purposes.

Example input stream is of hostnames extracted from log data. Some need converting to a human-friendly identifier, whilst others do not.

Input data (hostnames):

("U7PG2,f09fc2238301,v3.7.40.6115") 
proxmox01 
("BZ2,dc9fdbec6a10,v3.7.40.6115") 

Desired psuedo-SQL:

SELECT CASE
  WHEN HOST LIKE '%U7PG%' THEN 'unifi' \
  WHEN HOST LIKE '%BZ2%' THEN 'wireless ap' \
  ELSE HOST \
  END)
FROM INPUT_STREAM

Expected output:

unifi
proxmox01 
wireless ap

Use case 2

To use KSQL as a way of evolving a schema, populating new columns conditionally based on existing data (i.e. where a simple default would not be correct)

For example, existing topic has a SKU, and a "product department" column needs adding to the topic. New messages will be populated with this at source, but existing data needs to be retrospectively processed, and the "product department" can be derived based on the SKU. For example:

Product | SKU
-|-
Toaster | H1235
Kettle | H1425
Banana | F0192
Apple | F1723
Cat | x1234

CREATE STREAM PRODUCT_V1 (PRODUCT VARCHAR, SKU VARCHAR) WITH …

CREATE STREAM PRODUCT_V2 AS \
SELECT PRODUCT, SKU, CASE \
                     WHEN SKU LIKE 'H%' THEN 'Homeware' \
                     WHEN SKU LIKE 'F%' THEN 'Food' \ 
                     ELSE 'Unknown' \
                     END AS PROD_DEPT
FROM PRODUCT_V1                     

Product | SKU | Product Department
-|-|-
Toaster | H1235 | Homeware
Kettle | H1425 | Homeware
Banana | F0192 | Food
Apple | F1723 | Food
Cat | x1234 | Unknown

Use case 3

Assigning measures to buckets

Order | Value
-|-
A1 | 100
B2 | 200
C1 | 150

SELECT ORDER, VALUE, CASE \
                    WHEN VALUE_RANGE < 100 THEN 'Small' \
                    WHEN VALUE_RANGE BETWEEN 100 AND 190 THEN 'Mid' \
                    WHEN VALUE_RANGE > 190 THEN 'High' \
                    END AS ORDER_VALUE_RANGE
FROM ORDER_STREAM                    

Order | Value | Order value range
-|-|-
A1 | 100 | Mid
B2 | 200 | High
C1 | 150 | Mid

Use case 4

Simple application of business logic during ETL

Order | Method Code
-|-
A1 | W
B2 | W
C1 | A

SELECT ORDER, METHOD_CDE, CASE METHOD_CDE \
                          WHEN 'W' THEN 'Website' \
                          WHEN 'A' THEN 'App' \
                          ELSE 'Other' \
                          AS ORDER_METHOD
FROM ORDER_STREAM

Order | Method Code | Order Method
-|-|-
A1 | W | Website
B2 | W | Website
C1 | A | App

Another re-iteration of similar use-case to above, that I've just hit up against. For a given field, some of the rows have a value; others done. I want to be able to use a CASE statement to assign a value based on the value in the other field.

Here's the data

ksql> select oui,name from ubnt_user where name is not null;
Raspberr | rpi-01.moffatt.me
 | Fire 01 (Red)
SlimDevi | Squeezebox - Kitchen
Raspberr | rpi-03.moffatt.me
 | cdh57-01-node-01.moffatt.me
Apple | Robin's work iPhone
 | logstash-irc.moffatt.me

And now I want to do something like

```
SELECT NAME, CASE \
WHEN OUI !='' THEN OUI
WHEN NAME LIKE 'Fire' THEN 'Amazon Fire'
WHEN NAME LIKE '%.moffatt.me' THEN 'Home server'
END AS DEVICE_TYPE
FROM ubnt_user;

another example: data from users in multiple regions; use CASE to selectively mask it depending on the region:

SELECT ORDER_ID, PRODUCT, \
       CASE REGION WHEN 'EU' THEN NULL ELSE CUSTOMER_NAME AS CUSTOMER_NAME, \
       CASE REGION WHEN 'EU' THEN NULL ELSE CUSTOMER_EMAIL AS CUSTOMER_EMAIL, \
       REGION FROM PURCHASES;

A huge portion of what ETL is meant to do is apply business logic to data. The use of CASE in KSQL is a fundamental necessity to accomplish this.

Another use case:

Inbound IoT data, applying a ML scoring model to it (c.f. https://github.com/kaiwaehner/ksql-fork-with-deep-learning-function). Whether scored in KSQL or before is actually irrelevant.
There's an inbound stream of data with a condition against which we want to flag something. Either populating a status column, or a metric. This would often be done to support downstream visualisation (e.g. in Elasticsearch/Kibana). CASE is needed to support this.

For example, inbound data:

eventid | score
-|-
1|10
2|11
3|20
4|11

Desired result (A):

eventid | score | anomaly
-|-|-
1|10|
2|11|
3|20|20
4|11|

Desired result (B):

eventid | score | anomaly_flg
-|-|-
1|10|FALSE
2|11|FALSE
3|20|TRUE
4|11|FALSE

So I want to run a SQL that looks like this:

(A)

SELECT EVENTID, SCORE, 
       CASE 
        WHEN SCORE >= 20 THEN SCORE
        ELSE NULL 
       END AS ANOMALY
FROM INBOUND_STREAM       

(B)

SELECT EVENTID, SCORE, 
       CASE 
        WHEN SCORE >= 20 THEN TRUE
        ELSE FALSE
       END AS ANOMALY_FLG
FROM INBOUND_STREAM       

@rmoff @apurvam
Could you please guide how "Use case 4" can be achieved in KSQL presently, without CASE statement ?

We have keys in a source topic that need to be mapped to values in a destination topic, without storing this key value pair in an additional database / topic.

A user defined function would not work for us as modifying a function / writing a new one would require ksql-server restart

| Source Topic | Destination Topic|
| -------------- | ------------------ |
| 1 | CREATE |
| 2 | INSERT |
| 3 | UPDATE |
| 4 | DELETE |

@repos-jmp it can't be done currently; that's why we have this issue :) Please do upvote the relevant section as it helps us track demand for the feature.

@rmoff Is there has any idea to achieve use CASE statement in KSQL?

@yajundong it can't be done currently; that's why we have this issue :) Please do upvote the relevant section as it helps us track demand for the feature.

any updates ?

@douglarek no update as such, other than we are aware of the demand for this, and the number of upvotes really do help us in gauging the demand for it vs other features.
Stay tuned…

We have a pending PR for this: #2319 . Hopefully this will be available in our 5.2 release.

would PR #2319 support aggregations on case statements like:

SELECT
    store_id
    ,SUM(CASE WHEN do_not_contact_email = 0
            THEN 1 
            ELSE 0 
        END) as 'active_email_member_count'
    ,COUNT(*) as 'active_member_count'                          
FROM
    members
WHERE
    active=1
    AND deleted=0
GROUP BY 
    store_id

Closed by #2319

@apurvam @hjafarpour just to check (and for others hitting this ticket in the future) : Does this mean CASE will be part of 5.2?

@rmoff Yes, it's currently slated for 5.2.

@justin-tomlinson I've just tried this out and yes it does work.

Until there is no real CASE for use cases like 4 EXTRACTJSONFIELD can be used:

SELECT EXTRACTJSONFIELD('{\"W\": \"Website\", \"A\": \"App\"}', '$.' + METHOD_CDE) AS ORDER_METHOD
FROM ORDER_STREAM

The workaround doesn't work for me, because the tokens are cached and parsed only once...

ksql> select extractjsonfield('{\\"0\\": \\"a\\", \\"1\\": \\"b\\"}', '$.' + cast (val as varchar)), '$.' + cast (val as varchar) from x;
a | $.0
a | $.1
a | $.2
a | $.3
a | $.4
a | $.5

See: https://github.com/confluentinc/ksql/blob/51b8f61f17e01617673f2221083f0d4df582f9cf/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java#L74

https://github.com/confluentinc/ksql/blob/51b8f61f17e01617673f2221083f0d4df582f9cf/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java#L74 (edited)

@the4thamigo-uk thank you for clarifying that. I've removed my assumption

Was this page helpful?
0 / 5 - 0 ratings