Ksql: Create a stream is successful, but select query no data

Created on 28 Jun 2018  路  9Comments  路  Source: confluentinc/ksql

KSQL version: 5.0.x

I create a stream successfully from a topic which had imported data by debezium-connectors-mysql.
But when I do select query, there is no data. And ksql-streams.log tell me that AVRO deserialize error, the detail list below:

[2018-06-28 15:45:32,965] WARN task [0_0] Skipping record due to deserialization error. topic=[ssq_mixbig.ssqsign_mixbig1.signer] partition=[0] offset=[726] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: Cannot deserialize type int16 as type int32
        at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:64)
        at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlStruct(ConnectDataTranslator.java:122)
        at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:84)
        at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:43)
        at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:50)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:931)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:826)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

Dose KSQL support the avro schema type INT8 and INT16锛烼he TABLE and STREAM only INTEGER datatype.

bug

All 9 comments

Can you share the schema of the records you are trying to deserialize?

This is a bug in our serde translating between the connect struct and the ksql struct. Should have a fix ready soon.

@apurvam The schema I am trying to deserialize is below:

{
"subject": "ssq_mixbig.ssqsign_mixbig1.signer-value",
"version": 1,
"id": 6,
"schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"ssq_mixbig.ssqsign_mixbig1.signer\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"contract_id\",\"type\":\"long\"},{\"name\":\"signer_user_id\",\"type\":\"long\"},{\"name\":\"sign_status\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}},{\"name\":\"ctime\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"utime\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}}],\"connect.name\":\"ssq_mixbig.ssqsign_mixbig1.signer.Value\"}"
}

sign_status field connect.type is int16 in this schema.

Thansk @ZhiXingHeYiApple . As @rodesai mentioned above, there is a bug in the way we currently handle int16 and int8 types. It will be fixed shortly.

Need this to be fixed, too. Thanks

Hitting this issue as well.

[2018-10-14 18:32:02,362] WARN task [0_0] Skipping record due to deserialization error. topic=[DOMESTIC_SHIPMENT_DELIVERED] partition=[0] offset=[1217122] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: DOMESTIC_SHIPMENT_DELIVERED
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:235)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:195)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:132)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
        at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

My topic name is DOMESTIC_SHIPMENT_DELIVERED and DomesticShipmentDelivered is my Avro schema. Can that be some issue?

Any Update on the fix for int16 datatypes?

This should be fixed in 5.0.1

Was this page helpful?
0 / 5 - 0 ratings