Describe the bug
When a custom UDTF returns an Avro schema whose data types are non-optional, push queries are able to work with the results, but persistent queries silently swallow the output.
To Reproduce
Confluent Platform 5.5.0, ksqlD 0.9.0.
docker-compose.yml for the components:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.5.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
ksqldb-server:
image: confluentinc/ksqldb-server:0.9.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- schema-registry
ports:
- "8088:8088"
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
- ./target:/etc/ksqldb/ext
environment:
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_EXTENSION_DIR: "/etc/ksqldb/ext/"
# Configuration to embed Kafka Connect support.
KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
KSQL_CONNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.9.0
container_name: ksqldb-cli
depends_on:
- broker
- ksqldb-server
entrypoint: /bin/sh
tty: true
pom.xml for the UDTF:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.estruxture</groupId>
<artifactId>goldberg</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- Specify the repository for Confluent dependencies -->
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<!-- Specify build properties -->
<properties>
<exec.mainClass>com.estruxture.goldberg.Cal1SplitPanel</exec.mainClass>
<java.version>1.8</java.version>
<kafka.version>5.5.0-ccs</kafka.version>
<kafka.scala.version>2.12</kafka.scala.version>
<scala.version>${kafka.scala.version}.8</scala.version>
<confluent.version>5.5.0</confluent.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- Specify the ksqldb-udf dependency -->
<dependencies>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-udf</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.5.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<!-- Build boilerplate -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>8</source>
<target>8</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
<!-- Package all dependencies as one jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>${exec.mainClass}</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Create the file src/main/java/io/confluent/developer/TestUdf.java. Note the Schema.INT32_SCHEMA, which should be Schema.OPTIONAL_INT32_SCHEMA to work with ksqlDB:
package io.confluent.developer;
import io.confluent.ksql.function.udf.UdfParameter;
import io.confluent.ksql.function.udtf.Udtf;
import io.confluent.ksql.function.udtf.UdtfDescription;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@UdtfDescription(name="ksql_struct_test", description="Test function for debugging.")
public class TestUdf {
public static final String OUTPUT_STRUCT_DESCRIPTOR = "STRUCT<" +
"TEST1 INT," +
"TEST2 INT" +
">";
public static final Schema OUTPUT_SCHEMA = SchemaBuilder.struct().optional()
.field("TEST1", Schema.INT32_SCHEMA)
.field("TEST2", Schema.INT32_SCHEMA)
.build();
@Udtf(description = "Test function for debugging", schema = OUTPUT_STRUCT_DESCRIPTOR)
public List<Struct> ksql_struct_test(@UdfParameter final int myValue) {
// Here, we are testing the output of a function in a structure format.
// We are simulating the split of one Kafka message into multiple, separate messages
// each with a potentially complex structure.
List<Struct> myList = new ArrayList<>();
myList.add(new Struct(OUTPUT_SCHEMA)
.put("TEST1", 1)
.put("TEST2", 2)
);
return myList;
}
}
Build the jar:
mvn clean compile assembly:single
Start Docker Compose:
docker-compose up
Start the CLI:
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
And run:
set 'auto.offset.reset' = 'earliest';
-- Any stream will do to reveal the bug.
create stream s1 (x varchar) with (kafka_topic = 's1', partitions = 1, value_format = 'avro');
insert into s1 (x) values ('a');
Now, use the UDTF in a push query:
SELECT ksql_struct_test(1) AS mytest FROM s1 EMIT CHANGES;
Which yields:
+----------------------------------------------------------------------------------------------------------------+
|MYTEST |
+----------------------------------------------------------------------------------------------------------------+
|{TEST1=1, TEST2=2} |
^CQuery terminated
Now turn that statement into a persistent query:
CREATE STREAM s2 AS SELECT ksql_struct_test(1) AS mytest FROM s1 EMIT CHANGES;
And query:
select * from s2 emit changes;
+-------------------------------------------------------+-------------------------------------------------------+
|ROWKEY |MYTEST |
+-------------------------------------------------------+-------------------------------------------------------+
This one hangs and returns nothing.
Expected behavior
The last select should either return the same data as the push query did, or both should emit some kind of error about bad data types. Nothing is visible in the logs or processing log.
Hey @MichaelDrogalis! I looked into this with current master.
I followed your example but without docker. I just created a UDF in our code like so
@UdfDescription(name = "AS_STRUCT", description = "Construct a list based on some inputs")
public class AsStruct {
public static final String OUTPUT_STRUCT_DESCRIPTOR =
"STRUCT<"
+ "TEST1 INT,"
+ "TEST2 INT"
+ ">";
public static final Schema OUTPUT_SCHEMA = SchemaBuilder.struct().optional()
.field("TEST1", Schema.INT32_SCHEMA)
.field("TEST2", Schema.INT32_SCHEMA)
.build();
@Udf(description = "Test function for debugging", schema = OUTPUT_STRUCT_DESCRIPTOR)
public Struct ksql_struct_test(@UdfParameter final int myValue) {
return new Struct(OUTPUT_SCHEMA).put("TEST1", 1).put("TEST2", 2);
}
}
and then created stream S1:
create stream s1 (x varchar) with (kafka_topic = 's1', partitions = 1, value_format = 'json');
and inserted a row into it
insert into s1 (x) values ('a');
Then, I created stream S2:
CREATE STREAM S2 WITH (KAFKA_TOPIC='S2', PARTITIONS=1, REPLICAS=1) AS SELECT S1.ROWKEY ROWKEY, AS_STRUCT(1) MYTEST FROM S1 S1 EMIT CHANGES;
And print the output topic of CSAS
print S2 from beginning;
which returned nothing. There was also no error or exception in the logs.
But, then I inserted another row into stream S1 like so
insert into s1 (x) values ('a')
and that caused the exception in the logs
[2020-05-27 17:16:48,563] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CSAS_S2_0-90d42a04-15db-402f-92b5-565c29d7f036-StreamThread-1. (io.confluent.ksql.util.QueryMetadata:134)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic S2 for task 0_0 due to:
org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:128)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:88)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:225)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:205)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:157)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:100)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$2(StreamTask.java:590)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:590)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:851)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:694)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:282)
at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:251)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:146)
... 27 more
However, no error is returned to the CLI. Moreover, if you try
select * from s2 emit changes;
again, there is no error in the CLI.
I am facing the similar issue while trying to write the data to a stream.
Infra: Docker 5.5 -confluent
code.txt
When this code is deployed, select statement is working but when trying to create CSAS or CTAS this query is not working.
IN similar fashion, while writing a UDAF - similar to Collectlist. that accepts struct as input and produces a list of struct.
I am facing an error like :

Source code for the UDAF is
code1.txt
please let us know when this bug could be fixed.
++ @MichaelDrogalis please provide your comments on this one
The strategic fix here is to move away from using the Connect Struct type in our UDF framework, as it has a different type system to ksqlDB, and switch to a type we control and which matches the type system of ksqlDB.
This is certainly something we need to do before going v1.0.
There may be a quicker fix/hack we can put in to work around this issue for now. Or maybe we should just bite the bullet and fix the source of the issue.
Somewhat related to https://github.com/confluentinc/ksql/issues/3624
Strategic fix to be covered under https://github.com/confluentinc/ksql/issues/3413. This ticket will be used for the short term (i.e. 3 years), fix.
Had a look at this today. Current state of affairs (on 0.11.0) is as follows:
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic <topic_name> for task 0_0 due to: org.apache.kafka.connect.errors.DataException: Struct schemas do not match. being logged in the server logs, and no messages are produced to the sink topic. This explains why subsequent push queries on the sink return no data (and no error).In terms of what we can do to make this better:
Struct schemas do not match. isn't great. To improve the error message, we could catch the error in the ksqlDB serializer and check the Struct schema for non-optional types and show a more specific error message if found. A bit of a hack but could be worthwhile depending on the prevalence of users getting tripped up by this. This fix would be relatively quick (a couple days).@MichaelDrogalis do you have thoughts on what sort of fix we'd like to pursue? I don't have a sense of how prevalent this particular issue is and what the priority is as a result.
Also note that there is a workaround for this in ksqlDB today: specific optional return types in the Struct, rather than non-optional return types (e.g., Schema.OPTIONAL_INT32_SCHEMA rather than Schema.INT32_SCHEMA) cc @satamrajuveeresh who suggested perhaps being blocked on this?
Thanks for the breakdown @vcrfxia. It seems like if we want to fix this, we just need to bite the bullet and do it properly. We can kick this out a little further, but we should probably queue it up soon. The issue with the workaround is that its almost impossible to discover. All you get is a hanging query with no feedback, so there's no way for you to know what the problem is, much less what to do about it.
The issue with the workaround is that its almost impossible to discover. All you get is a hanging query with no feedback, so there's no way for you to know what the problem is, much less what to do about it.
Needing to check the processing log to discover processing errors is common across many different types of errors in ksqlDB: inability to deserialize source records, various types of production errors (e.g., record size too large), etc. In my mind if a query isn't producing the expected results, the processing log is a natural first place to look, so I'm surprised by the statement that this is "almost impossible to discover." Are you suggesting we should think about replacing the processing log as a first-step in debugging?
I agree, though, that figuring out what to about the error (post-discovery) is pretty rough, unless we add the additional hack of checking for non-optional schema types upon hitting schema incompatibility, in order to improve the error message.
My comment extends from the fact that this error wasn't showing up in the processing log. :) If that's fixed, then we're in a better place.
Opened a PR to add serialization exceptions to the processing log, and throw a custom error message calling out the possibility of serialization exceptions (for struct fields) being caused by non-optional schemas from UDFs: https://github.com/confluentinc/ksql/pull/6084
The long-term fix of switching away from Connect types to remove the possibility of encountering this error will be tracked in https://github.com/confluentinc/ksql/issues/4961 instead.
Most helpful comment
The strategic fix here is to move away from using the Connect
Structtype in our UDF framework, as it has a different type system to ksqlDB, and switch to a type we control and which matches the type system of ksqlDB.This is certainly something we need to do before going v1.0.
There may be a quicker fix/hack we can put in to work around this issue for now. Or maybe we should just bite the bullet and fix the source of the issue.
Somewhat related to https://github.com/confluentinc/ksql/issues/3624