Clickhouse: Avro Schema Evolution Support

Created on 26 Jun 2020  路  7Comments  路  Source: ClickHouse/ClickHouse

I use AvroConfulent data format with schema registry to consume Kafka events to clickhouse.

  • 褋urrently, Avro schemas are cached once resolved
  • after evolving Avro schema(add fields) clickhouse doesn't fetch new schema and failed with an error on SELECT raws from Kafka table
  • recreate Kafka table, change format_avro_schema_registry_url and restart clickhouse server doesn't refresh the cache
  • if I create the same Table on another Kafka topic with the same format everything works correctly, and I guess that schema cache works by schema name

As I understand these lines of code and comments in PR, fetch a new Avro schema should work automatically:
https://github.com/ClickHouse/ClickHouse/pull/8953/files/927e572d39432d22ae96e087674a5124c6e2931b#diff-4c0061c616a674630b9e2e74706e6255R61
https://github.com/ClickHouse/ClickHouse/pull/8571

AvroConfluent only caches schemas per an instance of an InputFormat. This means SchemaRegistry will be queried each time a batch from Kafka is processed.

comp-formats question question-answered

Most helpful comment

A few days later my old stream has begun work without any change, but on other environments, I saw the same behaviour although I re-setup stream with new consumer id. After investigate an clickhouse server config file I found misconfiguration of my clickhouse Kafka consumer config which is followed to read old messages with the new consumer group id.

Thank you guys, now I see that's behaviour spread only on my custom setup and I should resolve an issue in my underlying infrastructure, and considering #12007 that helps to troubleshoot same things, I close the issue. Many thanks

All 7 comments

This is parts of my error logs, I hope it helps you:

`020.06.26 15:13:30.824286 [ 269 ] {} DynamicQueryHandler: Code: 8, e.displayText() = DB::Exception: Field brand_id not found in Avro schema: While executing SourceFromInputStream, Stack trace (when copying this message, always include the lines below):

  1. Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x10406ef0 in /usr/bin/clickhouse
  2. DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x8ff88ad in /usr/bin/clickhouse
  3. ? @ 0xdb32ec7 in /usr/bin/clickhouse
  4. DB::AvroConfluentRowInputFormat::getOrCreateDeserializer(unsigned int) @ 0xdb30fd8 in /usr/bin/clickhouse
  5. DB::AvroConfluentRowInputFormat::readRow(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, DB::RowReadExtension&) @ 0xdb313bd in /usr/bin/clickhouse
  6. DB::IRowInputFormat::generate() @ 0xdb4b9c1 in /usr/bin/clickhouse

    [ 248 ] {} DynamicQueryHandler: Code: 8, e.displayText() = DB::Exception: Field brand_id not found in Avro schema: While executing SourceFromInputStream, Stack trace (when copying this message, always include the lines below):

While executing SourceFromInputStream (version 20.4.2.9 (official build)) (from ...:57874) (in query: SELECT * FROM ga.payments_s LIMIT 100 FORMAT TabSeparatedWithNamesAndTypes;), Stack trace (when copying this message, always include the lines below):

  1. Poco::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x10406ef0 in /usr/bin/clickhouse
  2. DB::Exception::Exception(std::__1::basic_string, std::__1::allocator > const&, int) @ 0x8ff88ad in /usr/bin/clickhouse
  3. ? @ 0xdb32ec7 in /usr/bin/clickhouse
  4. DB::AvroConfluentRowInputFormat::getOrCreateDeserializer(unsigned int) @ 0xdb30fd8 in /usr/bin/clickhouse
  5. DB::AvroConfluentRowInputFormat::readRow(std::__1::vector::mutable_ptr, std::__1::allocator::mutable_ptr > >&, DB::RowReadExtension&) @ 0xdb313bd in /usr/bin/clickhouse
  6. DB::IRowInputFormat::generate() @ 0xdb4b9c1 in /usr/bin/clickhouse
  7. DB::ISource::work() @ 0xdae25fb in /usr/bin/clickhouse
  8. ? @ 0xd7ee7dd in /usr/bin/clickhouse
  9. DB::KafkaBlockInputStream::readImpl() @ 0xd7ef3b8 in /usr/bin/clickhouse
  10. DB::IBlockInputStream::read() @ 0xce37e0d in /usr/bin/clickhouse
  11. DB::SourceFromInputStream::generate() @ 0xdd23e84 in /usr/bin/clickhouse
  12. DB::ISource::work() @ 0xdae25fb in /usr/bin/clickhouse
  13. DB::SourceFromInputStream::work() @ 0xdd23d5d in /usr/bin/clickhouse
  14. ? @ 0xdb0b9a1 in /usr/bin/clickhouse
  15. DB::PipelineExecutor::executeSingleThread(unsigned long, unsigned long) @ 0xdb0f95d in /usr/bin/clickhouse
  16. DB::PipelineExecutor::executeImpl(unsigned long) @ 0xdb11a98 in /usr/bin/clickhouse
  17. DB::PipelineExecutor::execute(unsigned long) @ 0xdb11c65 in /usr/bin/clickhouse
  18. DB::executeQuery(DB::ReadBuffer&, DB::WriteBuffer&, bool, DB::Context&, std::__1::function, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&, std::__1::basic_string, std::__1::allocator > const&)>) @ 0xd53be0b in /usr/bin/clickhouse`

/cc @oandrew

@SergeyIBM47
Schema id and schema are immutable.
Schemas are cached by their id.
Each kafka message contains its schema id which is used to find the schema in the registry.

Most likely you get this error because you are trying to read old messages (with old schema) which do not have new field in their schema.
e.g. your topic probably looks like this:

offset, value (schema_id, values)
0, (1, field1=1, field2=1)
1, (1, field1=2, field2=2)
2, (1, field1=3, field2=3)
3, (2, field1=4, field2=4, brand_id=123)
4, (2, field1=5, field2=5, brand_id=123)

You could try setting your consumer group offsets such that they don't attempt to consume old messages (e.g. offset=3 for the example above)

p.s.
same as https://github.com/ClickHouse/ClickHouse/issues/9822#event-3159128858

You could try setting your consumer group offsets such that they don't attempt to consume old messages (e.g. offset=3 for the example above)

May be we can allow using default value (by special setting ) if the requested column is absent in the Avro message instead of exception 'Field brand_id not found in Avro schema'? We can reuse skip_unknown_fields and (don't remember the setting name - smth like use defaults for missing columns) supported by JSONEachRow.

@oandrew
Unfortunately, I read new messages from a topic which contains a new schema id. I have several times changed the consumer group id and even drop old schemas from the schema registry and underlying topic for test purpose. I get the behaviour the same as I described above - the new schema wasn't fetched.

But my workaround with transfer data as is, to the new topic from origins one - works well and clickhouse have no problem with my new schema.

Also, I can sink messages to clickhouse from original topic which contains a new schema if not use new fields from a new schema, although old schema removed from schema registry.

I understand that clickhouse store the schema in local persistence cache by performance reason, but I propose to introduce some mechanism to allow refresh this cache.
For example:

@SergeyIBM47
Schemas are only cached in memory and it's ok because schemas are immutable.
Keep in mind you can't change the schema of old messages because kafka messages are also immutable.

Also, what do you mean by "old schema removed from schema registry"?
If you were referring to DELETE /subjects/(string: subject):

The Schema Registry API supports deleting a specific schema version or all versions of a subject. The API only deletes the version and the underlying schema ID would still be available for any lookup.
https://docs.confluent.io/current/schema-registry/schema-deletion-guidelines.html

Also, I can sink messages to clickhouse from original topic which contains a new schema if not use new fields from a new schema, although old schema removed from schema registry.

Yep, that is an example of forward compatible schema evolution.

If you want to troubleshoot further you can:

  • Set ClickHouse logging level to TRACE (all fetched schemas/ids are logged)
  • Use kafkacat to see schema ids in your messages:
kafkacat   -b kafka-broker:9092  -C -t topic -o beginning -s 'B I' | cut -d ' ' -f2  | uniq

@filimonov That sounds good to me. The only minor issue is that InputFormats have no access to default column values from table definition.

A few days later my old stream has begun work without any change, but on other environments, I saw the same behaviour although I re-setup stream with new consumer id. After investigate an clickhouse server config file I found misconfiguration of my clickhouse Kafka consumer config which is followed to read old messages with the new consumer group id.

Thank you guys, now I see that's behaviour spread only on my custom setup and I should resolve an issue in my underlying infrastructure, and considering #12007 that helps to troubleshoot same things, I close the issue. Many thanks

Was this page helpful?
0 / 5 - 0 ratings