ScalaPB on Spark

Created on 9 Dec 2016  Â·  14Comments  Â·  Source: scalapb/ScalaPB

I know there are some issues getting ScalaPB to work with Spark. I know of this project, and it works great if you just want to use DataFrames, however, it does not work for Datasets. After a lot of digging I was able to figure out the reason, and that's because enums in ScalaPB are implemented as case objects. This is a perfect representation of an Enum, but because of the way that the default Spark Serializer works a zero argument constructor is needed to make everything work for Spark. A zero argument constructor is not provided with case objects.

I figured out a workaround that will suffice for now but it's not general enough to submit a pull request. Assuming we have 3 enums called Platform, Dimension, and Definition the following code will create a UserDefinedType for each of those enums.

abstract class GeneratedEnumUDT[T >: Null <: GeneratedEnum: ClassTag] extends UserDefinedType[T] {
  override def sqlType: DataType = IntegerType

  override def serialize(obj: T): Any = obj.value

  override def userClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
}

class PlatformUDT extends GeneratedEnumUDT[Platform] {
  override def deserialize(datum: Any): Platform = Platform.fromValue(datum.asInstanceOf[Int])
}
class DimensionUDT extends GeneratedEnumUDT[Dimension] {
  override def deserialize(datum: Any): Dimension = Dimension.fromValue(datum.asInstanceOf[Int])
}
class DefinitionUDT extends GeneratedEnumUDT[Definition] {
  override def deserialize(datum: Any): Definition = Definition.fromValue(datum.asInstanceOf[Int])
}

We then need to register these UDTs so the Catalyst serializer knows what to do with them. This code will do that in general making use of Reflections.

def register(basePackage: String): Unit = {
    val enumReflections = new Reflections(basePackage)
    val classes = enumReflections.getSubTypesOf(classOf[GeneratedEnum]).asScala

    classes.filter(!_.getName.contains("$"))foreach{ clazz =>
      UDTRegistration.register(clazz.getName, s"org.apache.spark.${clazz.getSimpleName}UDT")
    }
  }

A few caveats to point out about this solution:

  1. It is not general enough. Ideally the *UDT classes from above would be auto generated for every enum found at compile time, but I'm not sure how to do this. I'd be willing to bet this can be done within sbt, but I'm not sure how.
  2. According to the documentation of UserDefinedType, the Spark people appear to be keen on replacing it soon so I'm not sure of the longevity of the above solution.
  3. The register function must be called before anyone can make a Dataset[Proto].

The last point can be avoided with we could annotate every generated enum within ScalaPB with SQLUserDefinedType. I'm not sure how this would work because I think making Spark a dependency of the base ScalaPB project is probably not a good idea.

The reason for this is to hopefully spur discussion so someone with extensive sbt experience can take my code and generalize it.

Most helpful comment

Hello, folks. I know this is probably not the best place to ask, but I have a follow-up question to this problem.

Our project uses maven and ScalaPBC instead of sbt. What is the right way to tell it to run the Udt generator?

Thanks in advance,
Tiago

All 14 comments

Ok, here is a first attempt based on your approach. It's a new custom generator that generates GeneratedEnumUDT subclasses. Let me know if it works for you.

Add the following line to your project/scalapb.sbt:

libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5"

Then, in your build.sbt add the new generator, so it looks like this:

libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb" % "0.1.5"

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  new scalapb.UdtGenerator -> (sourceManaged in Compile).value
)

In your code, call the registration method:

YourPackage.ProtoFile${UDT}.register()

(Look for it in target/sparksql-scalapb/target/scala-2.11/src_managed/main if you can't find it)

Thanks for the follow up. This doesn't seem to work for me. Here are the issues I'm having.

  1. When I do exactly as you suggest I get the following errors
[info] Compiling 13 Scala sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:4: object spark is not a member of package org.apache
[error]   class zefr__commons__proto__Video__Platform extends _root_.org.apache.spark.scalapb_hack.GeneratedEnumUDT[zefr.commons.proto.Video.Platform]
  1. When I then add spark-core and spark-sql to my dependencies in build.sbt such that build.sbt looks like
conflictManager := ConflictManager.strict

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value,
  scalapb.UdtGenerator -> (sourceManaged in Compile).value
)

dependencyOverrides ++= Set(
  "commons-codec" % "commons-codec" % "1.10",
  "commons-io" % "commons-io" % "2.1",
  "com.google.protobuf" % "protobuf-java" % "3.1.0"
)

libraryDependencies ++= Seq(
  // For finding google/protobuf/descriptor.proto
  "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5",
  "org.apache.spark" %% "spark-core" % "2.0.2",
  "org.apache.spark" %% "spark-sql" % "2.0.2"
)

and my project/proto.sbt looks like

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.1")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")

libraryDependencies ++= Seq(
  "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5"
)

I get the following errors

[info] Compiling 13 Scala sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:4: object scalapb_hack is not a member of package org.apache.spark
[error]   class zefr__commons__proto__Video__Platform extends _root_.org.apache.spark.scalapb_hack.GeneratedEnumUDT[zefr.commons.proto.Video.Platform]
  1. I also need the generated Java classes, and when I change my build.sbt as
conflictManager := ConflictManager.strict

PB.targets in Compile := Seq(
  PB.gens.java -> (sourceManaged in Compile).value,
  scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
  scalapb.UdtGenerator -> (sourceManaged in Compile).value
)

dependencyOverrides ++= Set(
  "commons-codec" % "commons-codec" % "1.10",
  "commons-io" % "commons-io" % "2.1",
  "com.google.protobuf" % "protobuf-java" % "3.1.0"
)

libraryDependencies ++= Seq(
  // For finding google/protobuf/descriptor.proto
  "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5",
  "org.apache.spark" %% "spark-core" % "2.0.2",
  "org.apache.spark" %% "spark-sql" % "2.0.2"
)

I get the following errors

info] Compiling 13 Scala sources and 2 Java sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:1: Video is already defined as object Video
[error] package zefr.commons.proto.Video

I really appreciate your help with this as I think it'll make it really easy for users of ScalaPB to use Protocol Buffers in Datasets trivially which has been a pain point of mine for a very long time.

Hi @jon-morra-zefr , fixes below:

  1. In build.sbt the dependency should be on sparksql-scalapb, not sparksql-scalapb-gen. Please also update the version to 0.1.6 (see later point).

  2. No need to add spark and spark-core to your project if you didn't have to before. There sparksql-scalapb adds a class under org.apache.spark.scalapb_hack (nasty, I know) to overcome the fact that UdtRegistration is a private Spark API...

  3. The last problem comes because you are using flat_package and the code generated by the UDT generator assumes you are not using flat_package. This is solved in sparksql-scalapb-gen 0.1.6 (update in project/proto.sbt), so you can use it like this:

PB.targets in Compile := Seq(
  PB.gens.java -> (sourceManaged in Compile).value,
  scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
  new scalapb.UdtGenerator(flatPackage = true) -> (sourceManaged in Compile).value
)

First, sorry about missing that comment about the gen in build.sbt. I fixed that and upgraded to 1.6. I now see these errors

[info] Compiling 13 Scala sources and 2 Java sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access term sql in package org.apache.spark,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access type UserDefinedType in value org.apache.spark.types,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.types.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access term sql in package org.apache.spark,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access type UserDefinedType in value org.apache.spark.types,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.types.
[info] No documentation generated with unsuccessful compiler run
[error] two errors found
[error] two errors found
[error] (compile:doc) Scaladoc generation failed
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 6 s, completed Dec 14, 2016 9:24:01 AM

Here is my build.sbt

scalaVersion := "2.11.8"
organization := "zefr.commons"
name := "commons-proto"

incOptions := incOptions.value.withNameHashing(true)

val nexus = "http://nexus.zefr.com/repository/maven"
resolvers ++= Seq(
  Resolver.sonatypeRepo("releases"),
  Resolver.sonatypeRepo("snapshots"),
  "Neuxs Snapshots" at s"$nexus-snapshots",
  "Neuxs Releases" at s"$nexus-releases"
)

scalacOptions ++= Seq(
  // "-verbose",
  "-unchecked",
  "-deprecation",
  "-feature",
  "-Xverify",
  "-Ywarn-inaccessible",
  "-Ydead-code",
  "-Ywarn-unused",
  "-Yclosure-elim"
)

// Set the dependency conflict resolution behavior.  For more info, see:
//   http://www.scala-sbt.org/0.13/api/index.html#sbt.ConflictManager$
//   https://ant.apache.org/ivy/history/latest-milestone/settings/conflict-managers.html
conflictManager := ConflictManager.strict

libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb" % "0.1.6"

PB.targets in Compile := Seq(
  PB.gens.java("3.1.0") -> (sourceManaged in Compile).value,
  scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
  new scalapb.UdtGenerator(flatPackage = true) -> (sourceManaged in Compile).value
)

and my project/proto.sbt

addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.3")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")

libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.6"

Please let me know if I missed anything else.

Ok, I was wrong on not having to add sparksql as a dependency. It should probably be added as a "provided" dependency like this:

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.2" % "provided"

I also had to comment out the conflictManager := ConflictManager.strict due to a guava related version conflict (it looks like this is originating from Spark, not ScalaPB)

Sorry for the delay. I am able to get past compilation. One question and one problem still remain

  1. It looks like I still have to manually call VideoTitleProtoCompanionUdt.register() before using the data in a Dataset. Is that correct?

  2. I'm trying to cast a Dataframe to a Dataset[VideoTitleProto]. This worked before by using the following data structure:

  val typeChangeMap = Map(
      "platform" -> udf[Option[Platform], String](Platform.fromName),
      "dimension" -> udf[Option[Dimension], String](Dimension.fromName),
      "definition" -> udf[Option[Definition], String](Definition.fromName)
    )

I would then call the appropriate udf based on the column name. This now generates the following stack trace:

Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => zefr__commons__proto__dimension)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
    ... 16 more

Can you please help resolve this?

Thanks

  1. Yes. You need to call register() for each proto file which contains a message that you use as a dataset. The register function takes care of calling register() for all its dependency.
  2. Can you post a full minimal project that demonstrates this problem. It says it fails to execute a function, but I don't know which.

OK, I was able to reproduce this error.

Here is my proto

syntax = "proto2";

package foo.test;

enum TestEnum {
    ABC = 0;
    DEF = 1;
}

message Foo {
    optional TestEnum testEnum = 1;
}

Here is my code

it should "show spark bug" in {
    import spark.implicits._
    TestMessageProtoUdt.register()
    val data = Seq("ABC", "ABC", "DEF", null)
    val df = spark.createDataset(data).withColumnRenamed("value", "testEnum")
    val typeChange = udf[Option[TestEnum], String](TestEnum.fromName)
    val typedDF = df.withColumn("testEnum", typeChange(df("testEnum")))
    val ds = typedDF.as[Foo]
    val localDS = ds.collect()
    val expected = Array(TestEnum.ABC, TestEnum.ABC, TestEnum.DEF)
    assertResult(expected)(localDS)
  }

For the purposes of my production code I assume I have the Dataframe already made and I want to then do the type changing and updating it to a Dataset.

@thesamet just a quick ping on this. Do you need more information from me?

Not sure which repo to ask about this on but... is this still unresolved @thesamet @jon-morra-zefr ?

Sorry, this fell through the cracks. Will look into this in the next few days.

I've just released sparksql-scalapb 0.1.8 which should address the issue described here. Tests that demonstrate how to use ScalaPB enums with SparkSQL are here: https://github.com/scalapb/sparksql-scalapb/blob/master/sparksql-scalapb/src/test/scala/DataSpec.scala

In case more issues like this come up in the future, it would be ideal to attach a pull request with a failing test in the above file, so the issue can be demonstrated by running sbt test.

Hello, folks. I know this is probably not the best place to ask, but I have a follow-up question to this problem.

Our project uses maven and ScalaPBC instead of sbt. What is the right way to tell it to run the Udt generator?

Thanks in advance,
Tiago

There's probably no way to do it right now. Can you file a feature request
on github and I'll look into this in the coming few weeks.

On Wed, Sep 13, 2017 at 12:32 PM, Tiago Silveira notifications@github.com
wrote:

Hello, folks. I know this is probably not the best place to ask, but I
have a follow-up question to this problem.

Our project uses maven and ScalaPBC instead of sbt. What is the right way
to tell it to run the Udt generator?

Thanks in advance,
Tiago

—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
https://github.com/scalapb/ScalaPB/issues/210#issuecomment-329272744,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASwrUoPWodGm1ELfef7X_WlTOclD7Dqks5siC3CgaJpZM4LJIe_
.

--
-Nadav

Was this page helpful?
0 / 5 - 0 ratings