KSQL version: 5.0.x
I create a stream successfully from a topic which had imported data by debezium-connectors-mysql.
But when I do select query, there is no data. And ksql-streams.log tell me that AVRO deserialize error, the detail list below:
[2018-06-28 15:45:32,965] WARN task [0_0] Skipping record due to deserialization error. topic=[ssq_mixbig.ssqsign_mixbig1.signer] partition=[0] offset=[726] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: Cannot deserialize type int16 as type int32
at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:64)
at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlStruct(ConnectDataTranslator.java:122)
at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:84)
at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:43)
at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:50)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:931)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:826)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Dose KSQL support the avro schema type INT8 and INT16锛烼he TABLE and STREAM only INTEGER datatype.
Can you share the schema of the records you are trying to deserialize?
This is a bug in our serde translating between the connect struct and the ksql struct. Should have a fix ready soon.
@apurvam The schema I am trying to deserialize is below:
{
"subject": "ssq_mixbig.ssqsign_mixbig1.signer-value",
"version": 1,
"id": 6,
"schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"ssq_mixbig.ssqsign_mixbig1.signer\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"contract_id\",\"type\":\"long\"},{\"name\":\"signer_user_id\",\"type\":\"long\"},{\"name\":\"sign_status\",\"type\":{\"type\":\"int\",\"connect.type\":\"int16\"}},{\"name\":\"ctime\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"utime\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}}],\"connect.name\":\"ssq_mixbig.ssqsign_mixbig1.signer.Value\"}"
}
sign_status field connect.type is int16 in this schema.
Thansk @ZhiXingHeYiApple . As @rodesai mentioned above, there is a bug in the way we currently handle int16 and int8 types. It will be fixed shortly.
Need this to be fixed, too. Thanks
Hitting this issue as well.
[2018-10-14 18:32:02,362] WARN task [0_0] Skipping record due to deserialization error. topic=[DOMESTIC_SHIPMENT_DELIVERED] partition=[0] offset=[1217122] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: DOMESTIC_SHIPMENT_DELIVERED
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:235)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:195)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:132)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:84)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
My topic name is DOMESTIC_SHIPMENT_DELIVERED and DomesticShipmentDelivered is my Avro schema. Can that be some issue?
Any Update on the fix for int16 datatypes?
This should be fixed in 5.0.1