Step 1: Register Avro-Schema in registry while trying to convert json to avro:
$ curl --silent -X GET http://localhost:8081/subjects/user-value/versions/latest | jq . {
"subject": "user-value",
"version": 1,
"id": 83,
"schema": "{\"type\":\"record\",\"name\":\"user_schema\",\"namespace\":\"com.mynamespace\",\"fields\":[{\"name\":\"USER\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"user_details\",\"fields\":[{\"name\":\"FIRST_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LAST_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EMAIL\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"IP_ADDRESS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LOGON_DATE\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}
Step 2:
ksql>
CREATE STREAM some_stream WITH (VALUE_FORMAT='AVRO',KAFKA_TOPIC='user') AS SELECT * FROM some_source;
Step 3:
Check with console consumer and see if schema from registry was used. Instead default schema "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema_USER was used again. Furthermore, the previous schema was silently overwritten.
$ curl --silent -X GET http://localhost:8081/subjects/user-value/versions/latest | jq .
{
"subject": "user-value",
"version": 2,
"id": 82,
"schema": "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"USER\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema_USER\",\"fields\":[{\"name\":\"FIRST_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LAST_NAME\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"EMAIL\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"IP_ADDRESS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"LOGON_DATE\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
}
Found in v5.1.0-rc3 of KSQL
+1
There should be a way to define if the CREATE STREAM/TABLE operation should use an existing schema instead of creating a new one every time. Maybe something like this:
ksql> CREATE STREAM some_stream WITH (VALUE_FORMAT='AVRO',KAFKA_TOPIC='user';SCHEMA='some_schema:version') AS SELECT * FROM some_source;
Is there a workaround for this? Perhaps duplicating a topic outside of KSQL and defining a schema on that.
Any Update about it, Is there a workaround to use the original avro?
Ran into this issue with ksqlDB 0.12.0. Is there any workaround this?
Most helpful comment
+1
There should be a way to define if the CREATE STREAM/TABLE operation should use an existing schema instead of creating a new one every time. Maybe something like this:
ksql> CREATE STREAM some_stream WITH (VALUE_FORMAT='AVRO',KAFKA_TOPIC='user';SCHEMA='some_schema:version') AS SELECT * FROM some_source;