Ksql: KSQL LEFT JOIN IS NOT WORKING.

Created on 16 Mar 2018  路  7Comments  路  Source: confluentinc/ksql

Hello Folks,

I have 2 kafka avro topics deal & expense but data had alot of whitespaces to clear that I have created following topic and table with trimmed data.
DEAL_STREAM
EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;

Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)

ksql> describe deal_stream;

Field | Type

ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)

KSQL_COL_2 | VARCHAR(STRING)

When I execute the following Query its giving me null pointer exception.
I tried following queries.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
2:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;

3:

CREATE STREAM deal_expense_trimmed AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0 where **td.KSQL_COL_0 IS NOT NULL;**


 Message
----------------------------
 Stream created and running
----------------------------
ksql> Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-115" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-116" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-113" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Exception in thread "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-114" java.lang.NullPointerException
        at io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
        at org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
        at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)

Most helpful comment

@zamirarif
I made a build with the latest code from the repository.
Looks like the issue should have been fixed.

All 7 comments

Which version are you running? This should be fixed in the latest master.

@rodesai Hi I am using the latest version of KSQL 0.5 and confluent 4.0.0

I got the same error but the latest master works fine for me.

@VladMl Hi Dude,
I am using the 0.5 which was released in February.

I hope you are running the same version or do you mean building the latest code from the repository.

I tried that but its failing due restriction to use the public repositories in our organization.

@zamirarif
I made a build with the latest code from the repository.
Looks like the issue should have been fixed.

This was fixed by https://github.com/confluentinc/ksql/pull/927 .

The patch will be part of the 4.1 release at the end of march / early April.

@apurvam Thanks for the response & closing the ticket.

Was this page helpful?
0 / 5 - 0 ratings