Using the JDBCSinkConnector to write to a mysql table from a kafka topic in avro generated by a KSQL CREATE TABLE AS SELECT statement results in the following error:
[2018-01-23 13:44:32,363] ERROR WorkerSinkTask{id=sink_mysql_groupbydevice-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: groupbydevice
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-01-23 13:44:32,365] ERROR WorkerSinkTask{id=sink_mysql_groupbydevice-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)
To reproduce do the following:
$ ./bin/confluent start
$ ./bin/ksql-cli remote http://localhost:8080
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews_kafka_topic_json', value_format='JSON');
ksql> CREATE table user_counts WITH (kafka_topic='groupbydevice', value_format='AVRO') AS SELECT userid, count(*) from pageviews_original group by userid;
Message
---------------------------
Table created and running
---------------------------
$ cat > ~/jdbcsink.json
{
"name": "sink_mysql_groupbydevice",
"config": {
"_comment": "Stream data to file from Kafka topic using JdbcSinkConnector",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"auto.create":"true",
"_comment": " --- JDBC-specific configuration below here --- ",
"_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
"connection.url": "jdbc:mysql://localhost:3306/demo2?user=root",
"_comment": "Which topic(s) to write data from",
"topics": "groupbydevice"
}
}
$ ./bin/confluent load sink_mysql_groupbydevice -d ~/jdbcsink.json
This tries to read from the groupbydevice topic in avro format, written by ksql, and write to a mysql table using kafka connects JDBC connector. It results in the deserialization exception above.
The issue is that the messages written by ksql have String keys and avro values. Kafka connect expects avro keys and avro values. This can be resolved by adding the following to the connect job configuration:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
note that to write to mysql you need add the mysql jdbc driver to the connect class path explicitly. You can get it from here: https://dev.mysql.com/downloads/connector/j/5.1.html
@apurvam Thanks for the response. It helped me resolved my issue.
@apurvam Thanks for the response. It helped me resolved my issue.
hello.it is worked for me too,but no data insert into my target table in mysql, in the ksql cli, according to selecting from table,it has data. do you know how to solve ,thanks
Most helpful comment
The issue is that the messages written by ksql have String keys and avro values. Kafka connect expects avro keys and avro values. This can be resolved by adding the following to the connect job configuration: