Azure-docs: How to authenticate against SchemaRegistry?

Created on 4 Dec 2019  Â·  7Comments  Â·  Source: MicrosoftDocs/azure-docs

The schema-registry is normally protected.
The confluent-kafka settings are:

  • schema.registry.basic.auth.credentials.source=USER_INFO
  • schema.registry.basic.auth.user.info=Key:Secret

How to use the Confluent SchemaRegistry?
Because I get a 401 (authorisation error) using this example.


Document Details

⚠ Do not edit this section. It is required for docs.microsoft.com ➟ GitHub issue linking.

Pri2 assigned-to-author azure-databricksvc doc-bug triaged

Most helpful comment

Well we decided to not use the avro-encoded records, just validate them and use Json.
As a Proof-of-Concept I have developed a working solution.

Note there are some rough edges...

// Install these packages to cluster using Maven first:
// Coordinates: io.confluent:kafka-avro-serializer:5.4.1, Repository: https://packages.confluent.io/maven/
// Coordinates: org.apache.kafka:kafka-clients:5.4.1-ce Repository: https://packages.confluent.io/maven/

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client._
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.avro.SchemaConverters
import java.util.HashMap

val topic = ""
val kafka_bootstrap = ""
val kafka_jaas_conf = ""
val schema_url = ""
val schema_client_id = ""
val schema_client_secret = ""

class MyAvroDeserializer extends AbstractKafkaAvroDeserializer with Serializable {
  def this(client: SchemaRegistryClient) {
    this()
    this.schemaRegistry = client
  }

  override def deserialize(bytes: Array[Byte]): String = {
    val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
    genericRecord.toString
  }

  def getValueSchema(topic: String): StructType = {
    val metadata = DeserializerWrapper.client.getLatestSchemaMetadata(topic + "-value")
    val avroSchema = DeserializerWrapper.client.getBySubjectAndId(topic + "-value", metadata.getId)
    SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
  }

}

object DeserializerWrapper extends Serializable {
  @transient lazy val client = new CachedSchemaRegistryClient(
   schema_url, 128, 
    new HashMap[String, String](){
      put("basic.auth.credentials.source", "USER_INFO")
      put("basic.auth.user.info", schema_client_id + ":" + schema_client_secret)
    }
  )

  @transient lazy val deserializer = new MyAvroDeserializer(client)

  val deserialize = udf((bytes: Array[Byte]) =>
    DeserializerWrapper.deserializer.deserialize(bytes)
  )
}

val kafka_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_bootstrap)
  .option("kafka.security.protocol","SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.jaas.config", kafka_jaas_conf)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .withColumn("message", DeserializerWrapper.deserialize($"value"))

// Note: message is a String, holding the Json (deserialised from an Avro-record) WhooHoo!

val record_stream = kafka_stream
  .withColumn("record", from_json($"message", DeserializerWrapper.deserializer.getValueSchema(topic)))
  .select($"record.*")

// Allow query through PySpark or SQL in other notebook cell.
record_stream
  .writeStream
  .outputMode("append")
  .format("memory")
  .queryName("in_mem_table")
  .start();

All 7 comments

Thank you for your detailed feedback. We are looking into the issue, and we will respond when we have more information.

We have assigned the issue to the content author . They will evaluate & update the document as appropriate

Hi @abij. Thank you for bringing this to our attention. We are reviewing the article.

in-progress

Hi @abij - this feature is not currently supported. Please submit your product feedback here: https://feedback.azure.com/forums/909463-azure-databricks

please-close

@abij did you find a way to work around this successful ? We are in the same situation currently.

Well we decided to not use the avro-encoded records, just validate them and use Json.
As a Proof-of-Concept I have developed a working solution.

Note there are some rough edges...

// Install these packages to cluster using Maven first:
// Coordinates: io.confluent:kafka-avro-serializer:5.4.1, Repository: https://packages.confluent.io/maven/
// Coordinates: org.apache.kafka:kafka-clients:5.4.1-ce Repository: https://packages.confluent.io/maven/

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client._
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.avro.SchemaConverters
import java.util.HashMap

val topic = ""
val kafka_bootstrap = ""
val kafka_jaas_conf = ""
val schema_url = ""
val schema_client_id = ""
val schema_client_secret = ""

class MyAvroDeserializer extends AbstractKafkaAvroDeserializer with Serializable {
  def this(client: SchemaRegistryClient) {
    this()
    this.schemaRegistry = client
  }

  override def deserialize(bytes: Array[Byte]): String = {
    val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
    genericRecord.toString
  }

  def getValueSchema(topic: String): StructType = {
    val metadata = DeserializerWrapper.client.getLatestSchemaMetadata(topic + "-value")
    val avroSchema = DeserializerWrapper.client.getBySubjectAndId(topic + "-value", metadata.getId)
    SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
  }

}

object DeserializerWrapper extends Serializable {
  @transient lazy val client = new CachedSchemaRegistryClient(
   schema_url, 128, 
    new HashMap[String, String](){
      put("basic.auth.credentials.source", "USER_INFO")
      put("basic.auth.user.info", schema_client_id + ":" + schema_client_secret)
    }
  )

  @transient lazy val deserializer = new MyAvroDeserializer(client)

  val deserialize = udf((bytes: Array[Byte]) =>
    DeserializerWrapper.deserializer.deserialize(bytes)
  )
}

val kafka_stream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafka_bootstrap)
  .option("kafka.security.protocol","SASL_SSL")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.jaas.config", kafka_jaas_conf)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .withColumn("message", DeserializerWrapper.deserialize($"value"))

// Note: message is a String, holding the Json (deserialised from an Avro-record) WhooHoo!

val record_stream = kafka_stream
  .withColumn("record", from_json($"message", DeserializerWrapper.deserializer.getValueSchema(topic)))
  .select($"record.*")

// Allow query through PySpark or SQL in other notebook cell.
record_stream
  .writeStream
  .outputMode("append")
  .format("memory")
  .queryName("in_mem_table")
  .start();
Was this page helpful?
0 / 5 - 0 ratings

Related issues

paulmarshall picture paulmarshall  Â·  3Comments

spottedmahn picture spottedmahn  Â·  3Comments

DeepPuddles picture DeepPuddles  Â·  3Comments

JeffLoo-ong picture JeffLoo-ong  Â·  3Comments

Ponant picture Ponant  Â·  3Comments