The schema-registry is normally protected.
The confluent-kafka settings are:
How to use the Confluent SchemaRegistry?
Because I get a 401 (authorisation error) using this example.
⚠Do not edit this section. It is required for docs.microsoft.com ➟ GitHub issue linking.
Thank you for your detailed feedback. We are looking into the issue, and we will respond when we have more information.
We have assigned the issue to the content author . They will evaluate & update the document as appropriate
Hi @abij. Thank you for bringing this to our attention. We are reviewing the article.
Hi @abij - this feature is not currently supported. Please submit your product feedback here: https://feedback.azure.com/forums/909463-azure-databricks
@abij did you find a way to work around this successful ? We are in the same situation currently.
Well we decided to not use the avro-encoded records, just validate them and use Json.
As a Proof-of-Concept I have developed a working solution.
Note there are some rough edges...
// Install these packages to cluster using Maven first:
// Coordinates: io.confluent:kafka-avro-serializer:5.4.1, Repository: https://packages.confluent.io/maven/
// Coordinates: org.apache.kafka:kafka-clients:5.4.1-ce Repository: https://packages.confluent.io/maven/
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client._
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.avro.SchemaConverters
import java.util.HashMap
val topic = ""
val kafka_bootstrap = ""
val kafka_jaas_conf = ""
val schema_url = ""
val schema_client_id = ""
val schema_client_secret = ""
class MyAvroDeserializer extends AbstractKafkaAvroDeserializer with Serializable {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
def getValueSchema(topic: String): StructType = {
val metadata = DeserializerWrapper.client.getLatestSchemaMetadata(topic + "-value")
val avroSchema = DeserializerWrapper.client.getBySubjectAndId(topic + "-value", metadata.getId)
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
}
object DeserializerWrapper extends Serializable {
@transient lazy val client = new CachedSchemaRegistryClient(
schema_url, 128,
new HashMap[String, String](){
put("basic.auth.credentials.source", "USER_INFO")
put("basic.auth.user.info", schema_client_id + ":" + schema_client_secret)
}
)
@transient lazy val deserializer = new MyAvroDeserializer(client)
val deserialize = udf((bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
}
val kafka_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap)
.option("kafka.security.protocol","SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.jaas.config", kafka_jaas_conf)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("message", DeserializerWrapper.deserialize($"value"))
// Note: message is a String, holding the Json (deserialised from an Avro-record) WhooHoo!
val record_stream = kafka_stream
.withColumn("record", from_json($"message", DeserializerWrapper.deserializer.getValueSchema(topic)))
.select($"record.*")
// Allow query through PySpark or SQL in other notebook cell.
record_stream
.writeStream
.outputMode("append")
.format("memory")
.queryName("in_mem_table")
.start();
Most helpful comment
Well we decided to not use the avro-encoded records, just validate them and use Json.
As a Proof-of-Concept I have developed a working solution.
Note there are some rough edges...