Source topic is populated by a CDC tool (Debezium) which includes the metadata of the source in messages, including table. KSQL (5.0.0-SNAPSHOT as of 05 Jul 2018) errors out on this.
ksql> print 'asgard.demo.CUSTOMERS-raw' FROM BEGINNING;
Format:AVRO
05/07/18 15:19:40 BST, , {"before": null, "after": {"id": 1, "first_name": "Rica", "last_name": "Blaisdell", "email": "[email protected]", "gender": "Female", "club_status": "bronze", "comments": "Universal optimal hierarchy", "create_ts": "2018-07-05T14:17:56Z", "update_ts": "2018-07-05T14:17:56Z"}, "source": {"version": "0.8.0.Beta1", "name": "asgard", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "demo", "table": "CUSTOMERS", "query": null}, "op": "c", "ts_ms": 1530800375805}
ksql> CREATE STREAM customers_S with (kafka_topic='asgard.demo.CUSTOMERS-raw', value_format='avro');
io.confluent.ksql.parser.exception.ParseFailedException: line 2:504: extraneous input 'TABLE' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'STRUCT', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Formatted message:
{
"before": null,
"after": {
"id": 1,
"first_name": "Rica",
"last_name": "Blaisdell",
"email": "[email protected]",
"gender": "Female",
"club_status": "bronze",
"comments": "Universal optimal hierarchy",
"create_ts": "2018-07-05T14:17:56Z",
"update_ts": "2018-07-05T14:17:56Z"
},
"source": {
"version": "0.8.0.Beta1",
"name": "asgard",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"snapshot": true,
"thread": null,
"db": "demo",
"table": "CUSTOMERS",
"query": null
},
"op": "c",
"ts_ms": 1530800375805
}
Schema:
{"type":"record","name":"Envelope","namespace":"asgard.demo.CUSTOMERS","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"int"},{"name":"first_name","type":["null","string"],"default":null},{"name":"last_name","type":["null","string"],"default":null},{"name":"email","type":["null","string"],"default":null},{"name":"gender","type":["null","string"],"default":null},{"name":"club_status","type":["null","string"],"default":null},{"name":"comments","type":["null","string"],"default":null},{"name":"create_ts","type":{"type":"string","connect.version":1,"connect.default":"1970-01-01T00:00:00Z","connect.name":"io.debezium.time.ZonedTimestamp"},"default":"1970-01-01T00:00:00Z"},{"name":"update_ts","type":{"type":"string","connect.version":1,"connect.default":"1970-01-01T00:00:00Z","connect.name":"io.debezium.time.ZonedTimestamp"},"default":"1970-01-01T00:00:00Z"}],"connect.name":"asgard.demo.CUSTOMERS.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mysql","fields":[{"name":"version","type":["null","string"],"default":null},{"name":"name","type":"string"},{"name":"server_id","type":"long"},{"name":"ts_sec","type":"long"},{"name":"gtid","type":["null","string"],"default":null},{"name":"file","type":"string"},{"name":"pos","type":"long"},{"name":"row","type":"int"},{"name":"snapshot","type":[{"type":"boolean","connect.default":false},"null"],"default":false},{"name":"thread","type":["null","long"],"default":null},{"name":"db","type":["null","string"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"query","type":["null","string"],"default":null}],"connect.name":"io.debezium.connector.mysql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null}],"connect.name":"asgard.demo.CUSTOMERS.Envelope"}
This also happens when using GoldenGate as a source. There is a workaround in that situation, which is to set includeTableName to false in the Kafka Connect Handler config.
Note that this workaround specific to GoldenGate only.
Workaround for Debezium is to use a Single Message Transform to drop the whole source field:
"transforms": "dropField",
"transforms.dropField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist":"source"
I have the same issue when using jcustenborder/kafka-connect-twitter connector and AVRO converter. Then "End" keyword is restricted.
Same issue with JSON value_format, "Limit" column name. Cannot escape it with single quotes
Most helpful comment
Workaround for Debezium is to use a Single Message Transform to drop the whole
sourcefield: