Tried to use the latest ksql 0.3 with schema registry integration to read Avro data, however, when we try to create a STREAM using:
ksql> CREATE STREAM status WITH (kafka_topic='status', value_format='AVRO');
we get:
CREATE STREAM status WITH (kafka_topic='status ', value_format='AVRO') Could not fetch the AVRO schema from schema registry. Cannot find correct type for avro type: Foo
where Foo is defined as an enum.
Looking through the ksql code we see the exception is being thrown by io.confluent.ksql.util.SerDeUtil which doesn't appear to handle enum:
private static Schema getKSQLSchemaForAvroSchema(org.apache.avro.Schema avroSchema) {
switch (avroSchema.getType()) {
case INT:
return Schema.INT32_SCHEMA;
case LONG:
return Schema.INT64_SCHEMA;
case DOUBLE:
case FLOAT:
return Schema.FLOAT64_SCHEMA;
case BOOLEAN:
return Schema.BOOLEAN_SCHEMA;
case STRING:
return Schema.STRING_SCHEMA;
case ARRAY:
return SchemaBuilder.array(getKSQLSchemaForAvroSchema(avroSchema.getElementType()));
case MAP:
return SchemaBuilder.map(Schema.STRING_SCHEMA,
getKSQLSchemaForAvroSchema(avroSchema.getValueType()));
default:
throw new KsqlException(String.format("Cannot find correct type for avro type: %s",
avroSchema.getFullName()));
}
}
Are we missing a config or definition parameter or are Avro enums/complex types not yet supported by ksql?
Avro enum is not supported in KSQL. The following is the list of data types that are supported for columns:
BOOLEAN
INTEGER
BIGINT
DOUBLE
VARCHAR (or STRING)
ARRAY
MAP
Need this to be fixed deadly
Any ETA on this ?
We are currently working on adding support for nested avro objects. That will be part of the 5.0 release of KSQL scheduled for early August.
If there is enough demand, we may be able to work on adding ENUM support in the following release (5.1, scheduled for November).
@apurvam thanks for explanation. We're really need this feature, so from my company demand is high :))
Plz can you explain how difficult it would be to implement this feature? If this is achievable, me and my colleagues are willing to contribute a PR.
@apurvam is the whole feature is just changing those files
Thanks for your interest, @eshepelyuk . It is more than just those two files.
We have to create an internal KSQL type for the ENUM as well, and then update the DDL language so that you can define columns of ENUM type. For the ENUM in particular, we may want to define the permissible symbols inside KSQL. Something like CREATE STREAM foo (col1 ENUM{"SYMBOL1", "SYMBOL2"} ....
Then we have to think about what happens when ENUMS are mixed with data formats with no native support for ENUMs (like JSON).
Etc.
It shouldn't be too hard, and the above work items are pure top-of-mind brainstorming. But we would need to a design doc and a discussion around it before jumping directly to code.
I am curious: how do all of you plan to leverage ENUMs inside KSQL?
As part of struct support, we're switching our avro serde implementation to use the Connect converter API. The Connect API translates enums into String fields by writing the enum value into the field as a String. If we decide this is a reasonable translation of avro enums to KSQL, then we would just get this for free.
@apurvam @rodesai I currently have a avro topic that uses enum in avro schema
And I just want to be able to read this topic from KSQL. Now I can't read that topic because of deserialization error.
So,it seems what @rodesal described will completely suitable for my needs.
Ah, In that case the upcoming struct changes will solve for your use case @eshepelyuk !
Thanks @apurvam for a clarification.
Works on latest KSQL release. May be closed as for me.
@pronzato @rodesai @apurvam what do you think ?
Yep, we should be able to close this now.
Most helpful comment
Need this to be fixed deadly