Couldn't figure out from documentation how to use Pulsar IO connectors ((
https://pulsar.apache.org/docs/en/io-managing/#configuring-connectors - here you tell to use yaml config files and give link to run connectors but in run connectors section - not once yaml configs were mentioned
Also, I couldn't find any information on how to configure SerDe for Pulsar IO? For example if my application uses protobuf schemas to create messages for Pulsar topic then how available Pulsar IO's sink connectors can read these messages, deserealize them and convert them to format in which the sink expects them to be?
I could only think about managing required ETL using Pulsar Functions: application writes protobuf messages, Pulsar Functions read them and converts to format which the sink expects and writes these messages to another topic and only then Pulsar IO reads these messages.
But requirement to write transformed messages to new topic for each (topic, sink) pair - seems overkill.
P.s. I would like to use external protobuf schemas: In our company, we store all of our schemas from all services in separate repository. For given service we compile needed schemas with required language and place resulted stubs into separate repo which is then used by the target service (leveraging git submodules) So it would be nice to be able to provide compiled protobuf stubs to Pulsar IO SerDe logic. From what I understand this repo provides this functional for Kafka Connect but I would love to use Pulsar instead of Kafka.
Couldn't figure out from documentation how to use Pulsar IO connectors ((
Thank you for your feedbacks. This helps us improve the documentation. We will try to make clarifications in the documentation according to your feedbacks.
I couldn't find any information on how to configure SerDe for Pulsar IO?
Pulsar IO is more about getting data in and out. Most of the connectors just transferred bytes. Some of the connectors like CDC and JDBC will attempt to deserialize the events using pulsar generic schema.
Currently Pulsar IO doesn't provide the ability to run functions along with the connectors. There are issues created for looking into that space.
I would like to use external protobuf schemas:
You can construct a schema instance using Schema.PROTOBUF(<generated-protobuf-class>.class). Then you can use the schema instance in your applications to publish the protobuf messages to a pulsar topic.
Assume your generated protobuf class is ProtobufClass. Then you can write functions as followings:
public class TestFunction implements Function<ProtobufClass, Void> {
}
when you submit such a function, you can specify --schema-type PROTOBUF. If you want to use your own SerDe, you can specify --custom-serde-inputs.
@Inilien hope my previous comment already addressed your questions. If no, please let me know.
@sijie yes, thank you
Considering yaml configuration I found this section of docummentation - the create command accepts yaml configuration file which is I assume is exactly the file from this example which I linked in the first message in the thread. (I don't know why I couldn't find this earlier... sorry)
It is still unclear if it is possible to specify custom serialization/deserialization logic for sinks and sources. From your previous answer I assumed that Pulsar IO doesn't support any SerDe logic but in source create and sink create I found these --deserialization-classname and --custom-serde-inputs parameters respectfully. Could they be used to serialize data ingested into Pulsar into protobuf format in source connectors and deserialize from protobuf in sink connectors?
I found these --deserialization-classname and --custom-serde-inputs parameters respectfully
You can write your own source and sink for doing that. My previous comment was that most of the (built-in) connector implementations are just about transferring bytes data in and out of Pulsar. so if you are using Pulsar's built-in connectors, you don't need to specify those settings. For example, if you connect data from Kafka to Pulsar, it just reads bytes from kafka and write bytes to Pulsar. Pulsar doesn't interpret the data.
So it depends on whether you are using Pulsar's built-in connectors and how are those connectors implemented. If you are planning to write own connector with protobuf, then you can write your own SerDe or use Pulsar's protobuf schema. If you can share more details about what you are planning to do, I can have a better idea on how to help you.
One of the use-cases at hand - is to pull data from Pulsar topic (which consists from protobuf messages) and push it into ElasticSearch.
If I understood you correctly, the built-in ElasticSearch connector wan't be able to deserialize protobuf messages and I should write my own connector for that purpose (probably just modify the existing one).
Also, is it correct that I can reuse KafkaConnect implementation for that purpose? If I am not mistaken this is how current implementation of Debezium connector for Pulsar works. https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
is to pull data from Pulsar topic (which consists from protobuf messages) and push it into ElasticSearch.
If I understood you correctly, the built-in ElasticSearch connector wan't be able to deserialize protobuf messages and I should write my own connector for that purpose (probably just modify the existing one).
yes that's correct. you just need to modify the existing one or add a new one based on the existing one.
is it correct that I can reuse KafkaConnect implementation for that purpose?
yes you can
Most helpful comment
Thank you for your feedbacks. This helps us improve the documentation. We will try to make clarifications in the documentation according to your feedbacks.
Pulsar IO is more about getting data in and out. Most of the connectors just transferred
bytes. Some of the connectors like CDC and JDBC will attempt to deserialize the events using pulsar generic schema.Currently Pulsar IO doesn't provide the ability to run
functionsalong with the connectors. There are issues created for looking into that space.You can construct a schema instance using
Schema.PROTOBUF(<generated-protobuf-class>.class). Then you can use the schema instance in your applications to publish the protobuf messages to a pulsar topic.Assume your generated protobuf class is
ProtobufClass. Then you can write functions as followings:when you submit such a function, you can specify
--schema-type PROTOBUF. If you want to use your own SerDe, you can specify--custom-serde-inputs.