Describe the bug
Currently the Debezium postgres source produces KeyValueSchema to the topics. Support is required for Avro schema type.
To Reproduce
Steps to reproduce the behavior:
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/pulsar-io-debezium-postgres-2.5.0-SNAPSHOT.nar"
schemaType: "avro"
parallelism: 1
configs:
database.hostname: "localhost"
database.port: "5432"
database.user: "hq_gosecure"
database.password: "postgres"
database.dbname: "hq_gosecure"
database.server.name: "dbserver1"
plugin.name: "wal2json"
pulsar.service.url: "pulsar://127.0.0.1:6650"
bin/pulsar-admin sources localrun --source-config-file conf/debezium-postgres-source-config.yamlExpected behavior
The connector should start producing messages in avro schema. But I'm getting the following error:
19:15:12.353 [main] INFO org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of
org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.pulsar.client.impl.schema.StructSchema.createAvroSchema(StructSchema.java:136) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.StructSchema.parseSchemaInfo(StructSchema.java:149) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:90) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:144) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:189) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:209) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_201]
at org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:327) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:255) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:787) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:213) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:244) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
... 15 more
Caused by: org.apache.avro.AvroTypeException: Unknown type: K
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:292) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:646) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:81) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2]
... 15 more
Desktop (please complete the following information):
@raghavi92 In Debezium, we have to use KeyValueSchema, and inside the KeyValueSchema, key and value could be avro type.
Please reference KeyValueSchema.java for more details.
@tuteng Could we add some doc for how to config and use schema in debezium connector?
@jiazhai yes I went through the KeyValueSchema code. As mentioned in the actual issue, I have provided the "schemaType" as Avro in my source config yaml file. But still, the avro type exception is thrown in this line :
https://github.com/apache/pulsar/blob/d3cb108590713735ef1b23ff61997bf68e0fd7f3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java#L62
I guess its because of this issue in Avro library itself.
@raghavi92 This need to config the key.converter and value.converter for the debezium connector. The default converter is Jason converter org.apache.kafka.connect.json.JsonConverter.
@jiazhai , I tried that also. Looks like we are yet to support Avro converter ?? The avro converter that io.confluent uses their own schema registry. It cannot be used with Pulsar.
@raghavi92 , Seems need to include related jar into the pom.xml file of pulsar-debezium to support it.
@tuteng to have a try and help provide a doc of how to do it.
This involves convert Avro schema that stores in Kafka registry into Pulsar readable data. We will provide a fix for this issue.