Ksql: KSQL doesn't support Avro schema with reserved keyword in column name

Created on 5 Jul 2018  路  4Comments  路  Source: confluentinc/ksql

Source topic is populated by a CDC tool (Debezium) which includes the metadata of the source in messages, including table. KSQL (5.0.0-SNAPSHOT as of 05 Jul 2018) errors out on this.

ksql> print 'asgard.demo.CUSTOMERS-raw' FROM BEGINNING;
Format:AVRO
05/07/18 15:19:40 BST, , {"before": null, "after": {"id": 1, "first_name": "Rica", "last_name": "Blaisdell", "email": "[email protected]", "gender": "Female", "club_status": "bronze", "comments": "Universal optimal hierarchy", "create_ts": "2018-07-05T14:17:56Z", "update_ts": "2018-07-05T14:17:56Z"}, "source": {"version": "0.8.0.Beta1", "name": "asgard", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "demo", "table": "CUSTOMERS", "query": null}, "op": "c", "ts_ms": 1530800375805}
ksql> CREATE STREAM customers_S with (kafka_topic='asgard.demo.CUSTOMERS-raw', value_format='avro');
io.confluent.ksql.parser.exception.ParseFailedException: line 2:504: extraneous input 'TABLE' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'STRUCT', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}

Formatted message:

{
  "before": null,
  "after": {
    "id": 1,
    "first_name": "Rica",
    "last_name": "Blaisdell",
    "email": "[email protected]",
    "gender": "Female",
    "club_status": "bronze",
    "comments": "Universal optimal hierarchy",
    "create_ts": "2018-07-05T14:17:56Z",
    "update_ts": "2018-07-05T14:17:56Z"
  },
  "source": {
    "version": "0.8.0.Beta1",
    "name": "asgard",
    "server_id": 0,
    "ts_sec": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "snapshot": true,
    "thread": null,
    "db": "demo",
    "table": "CUSTOMERS",
    "query": null
  },
  "op": "c",
  "ts_ms": 1530800375805
}

Schema:

{"type":"record","name":"Envelope","namespace":"asgard.demo.CUSTOMERS","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":["null","string"],"default":null},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":["null","string"],"default":null},{"name":"gender","type":["null","string"],"default":null},{"name":"club_status","type":["null","string"],"default":null},{"name":"comments","type":["null","string"],"default":null},{"name":"create_ts","type":{"type":"string","connect.version":1,"connect.default":"1970-01-01T00:00:00Z","connect.name":"io.debezium.time.ZonedTimestamp"},"default":"1970-01-01T00:00:00Z"},{"name":"update_ts","type":{"type":"string","connect.version":1,"connect.default":"1970-01-01T00:00:00Z","connect.name":"io.debezium.time.ZonedTimestamp"},"default":"1970-01-01T00:00:00Z"}],"connect.name":"asgard.demo.CUSTOMERS.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":["null","string"],"default":null},{"name":"name","type":"string"},{"name":"server_id","type":"long"},{"name":"ts_sec","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"snapshot","type":[{"type":"boolean","connect.default":false},"null"],"default":false},{"name":"thread","type":["null","long"],"default":null},{"name":"db","type":["null","string"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null}],"connect.name":"asgard.demo.CUSTOMERS.Envelope"}
avro bug data-accessibility

Most helpful comment

Workaround for Debezium is to use a Single Message Transform to drop the whole source field:

"transforms": "dropField",
"transforms.dropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist":"source"  

All 4 comments

This also happens when using GoldenGate as a source. There is a workaround in that situation, which is to set includeTableName to false in the Kafka Connect Handler config.

Note that this workaround specific to GoldenGate only.

Workaround for Debezium is to use a Single Message Transform to drop the whole source field:

"transforms": "dropField",
"transforms.dropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist":"source"  

I have the same issue when using jcustenborder/kafka-connect-twitter connector and AVRO converter. Then "End" keyword is restricted.

Same issue with JSON value_format, "Limit" column name. Cannot escape it with single quotes

Was this page helpful?
0 / 5 - 0 ratings