I know there are some issues getting ScalaPB to work with Spark. I know of this project, and it works great if you just want to use DataFrames, however, it does not work for Datasets. After a lot of digging I was able to figure out the reason, and that's because enums in ScalaPB are implemented as case objects. This is a perfect representation of an Enum, but because of the way that the default Spark Serializer works a zero argument constructor is needed to make everything work for Spark. A zero argument constructor is not provided with case objects.
I figured out a workaround that will suffice for now but it's not general enough to submit a pull request. Assuming we have 3 enums called Platform, Dimension, and Definition the following code will create a UserDefinedType for each of those enums.
abstract class GeneratedEnumUDT[T >: Null <: GeneratedEnum: ClassTag] extends UserDefinedType[T] {
override def sqlType: DataType = IntegerType
override def serialize(obj: T): Any = obj.value
override def userClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
}
class PlatformUDT extends GeneratedEnumUDT[Platform] {
override def deserialize(datum: Any): Platform = Platform.fromValue(datum.asInstanceOf[Int])
}
class DimensionUDT extends GeneratedEnumUDT[Dimension] {
override def deserialize(datum: Any): Dimension = Dimension.fromValue(datum.asInstanceOf[Int])
}
class DefinitionUDT extends GeneratedEnumUDT[Definition] {
override def deserialize(datum: Any): Definition = Definition.fromValue(datum.asInstanceOf[Int])
}
We then need to register these UDTs so the Catalyst serializer knows what to do with them. This code will do that in general making use of Reflections.
def register(basePackage: String): Unit = {
val enumReflections = new Reflections(basePackage)
val classes = enumReflections.getSubTypesOf(classOf[GeneratedEnum]).asScala
classes.filter(!_.getName.contains("$"))foreach{ clazz =>
UDTRegistration.register(clazz.getName, s"org.apache.spark.${clazz.getSimpleName}UDT")
}
}
A few caveats to point out about this solution:
Dataset[Proto].The last point can be avoided with we could annotate every generated enum within ScalaPB with SQLUserDefinedType. I'm not sure how this would work because I think making Spark a dependency of the base ScalaPB project is probably not a good idea.
The reason for this is to hopefully spur discussion so someone with extensive sbt experience can take my code and generalize it.
Ok, here is a first attempt based on your approach. It's a new custom generator that generates GeneratedEnumUDT subclasses. Let me know if it works for you.
Add the following line to your project/scalapb.sbt:
libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5"
Then, in your build.sbt add the new generator, so it looks like this:
libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb" % "0.1.5"
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value,
new scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
In your code, call the registration method:
YourPackage.ProtoFile${UDT}.register()
(Look for it in target/sparksql-scalapb/target/scala-2.11/src_managed/main if you can't find it)
Thanks for the follow up. This doesn't seem to work for me. Here are the issues I'm having.
[info] Compiling 13 Scala sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:4: object spark is not a member of package org.apache
[error] class zefr__commons__proto__Video__Platform extends _root_.org.apache.spark.scalapb_hack.GeneratedEnumUDT[zefr.commons.proto.Video.Platform]
conflictManager := ConflictManager.strict
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value,
scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
dependencyOverrides ++= Set(
"commons-codec" % "commons-codec" % "1.10",
"commons-io" % "commons-io" % "2.1",
"com.google.protobuf" % "protobuf-java" % "3.1.0"
)
libraryDependencies ++= Seq(
// For finding google/protobuf/descriptor.proto
"com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5",
"org.apache.spark" %% "spark-core" % "2.0.2",
"org.apache.spark" %% "spark-sql" % "2.0.2"
)
and my project/proto.sbt looks like
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.1")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")
libraryDependencies ++= Seq(
"com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5"
)
I get the following errors
[info] Compiling 13 Scala sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:4: object scalapb_hack is not a member of package org.apache.spark
[error] class zefr__commons__proto__Video__Platform extends _root_.org.apache.spark.scalapb_hack.GeneratedEnumUDT[zefr.commons.proto.Video.Platform]
conflictManager := ConflictManager.strict
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value,
scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
scalapb.UdtGenerator -> (sourceManaged in Compile).value
)
dependencyOverrides ++= Set(
"commons-codec" % "commons-codec" % "1.10",
"commons-io" % "commons-io" % "2.1",
"com.google.protobuf" % "protobuf-java" % "3.1.0"
)
libraryDependencies ++= Seq(
// For finding google/protobuf/descriptor.proto
"com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.5",
"org.apache.spark" %% "spark-core" % "2.0.2",
"org.apache.spark" %% "spark-sql" % "2.0.2"
)
I get the following errors
info] Compiling 13 Scala sources and 2 Java sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] /Users/jon.morra/git/commons-proto/target/scala-2.11/src_managed/main/zefr/commons/proto/Video/VideoProtoCompanionUdt.scala:1: Video is already defined as object Video
[error] package zefr.commons.proto.Video
I really appreciate your help with this as I think it'll make it really easy for users of ScalaPB to use Protocol Buffers in Datasets trivially which has been a pain point of mine for a very long time.
Hi @jon-morra-zefr , fixes below:
In build.sbt the dependency should be on sparksql-scalapb, not sparksql-scalapb-gen. Please also update the version to 0.1.6 (see later point).
No need to add spark and spark-core to your project if you didn't have to before. There sparksql-scalapb adds a class under org.apache.spark.scalapb_hack (nasty, I know) to overcome the fact that UdtRegistration is a private Spark API...
The last problem comes because you are using flat_package and the code generated by the UDT generator assumes you are not using flat_package. This is solved in sparksql-scalapb-gen 0.1.6 (update in project/proto.sbt), so you can use it like this:
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value,
scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
new scalapb.UdtGenerator(flatPackage = true) -> (sourceManaged in Compile).value
)
First, sorry about missing that comment about the gen in build.sbt. I fixed that and upgraded to 1.6. I now see these errors
[info] Compiling 13 Scala sources and 2 Java sources to /Users/jon.morra/git/commons-proto/target/scala-2.11/classes...
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access term sql in package org.apache.spark,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access type UserDefinedType in value org.apache.spark.types,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.types.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access term sql in package org.apache.spark,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.
[error] missing or invalid dependency detected while loading class file 'GeneratedEnumUDT.class'.
[error] Could not access type UserDefinedType in value org.apache.spark.types,
[error] because it (or its dependencies) are missing. Check your build definition for
[error] missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.)
[error] A full rebuild may help if 'GeneratedEnumUDT.class' was compiled against an incompatible version of org.apache.spark.types.
[info] No documentation generated with unsuccessful compiler run
[error] two errors found
[error] two errors found
[error] (compile:doc) Scaladoc generation failed
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 6 s, completed Dec 14, 2016 9:24:01 AM
Here is my build.sbt
scalaVersion := "2.11.8"
organization := "zefr.commons"
name := "commons-proto"
incOptions := incOptions.value.withNameHashing(true)
val nexus = "http://nexus.zefr.com/repository/maven"
resolvers ++= Seq(
Resolver.sonatypeRepo("releases"),
Resolver.sonatypeRepo("snapshots"),
"Neuxs Snapshots" at s"$nexus-snapshots",
"Neuxs Releases" at s"$nexus-releases"
)
scalacOptions ++= Seq(
// "-verbose",
"-unchecked",
"-deprecation",
"-feature",
"-Xverify",
"-Ywarn-inaccessible",
"-Ydead-code",
"-Ywarn-unused",
"-Yclosure-elim"
)
// Set the dependency conflict resolution behavior. For more info, see:
// http://www.scala-sbt.org/0.13/api/index.html#sbt.ConflictManager$
// https://ant.apache.org/ivy/history/latest-milestone/settings/conflict-managers.html
conflictManager := ConflictManager.strict
libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb" % "0.1.6"
PB.targets in Compile := Seq(
PB.gens.java("3.1.0") -> (sourceManaged in Compile).value,
scalapb.gen(javaConversions = true, flatPackage = true) -> (sourceManaged in Compile).value,
new scalapb.UdtGenerator(flatPackage = true) -> (sourceManaged in Compile).value
)
and my project/proto.sbt
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.3")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.3")
libraryDependencies += "com.trueaccord.scalapb" %% "sparksql-scalapb-gen" % "0.1.6"
Please let me know if I missed anything else.
Ok, I was wrong on not having to add sparksql as a dependency. It should probably be added as a "provided" dependency like this:
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.2" % "provided"
I also had to comment out the conflictManager := ConflictManager.strict due to a guava related version conflict (it looks like this is originating from Spark, not ScalaPB)
Sorry for the delay. I am able to get past compilation. One question and one problem still remain
It looks like I still have to manually call VideoTitleProtoCompanionUdt.register() before using the data in a Dataset. Is that correct?
I'm trying to cast a Dataframe to a Dataset[VideoTitleProto]. This worked before by using the following data structure:
val typeChangeMap = Map(
"platform" -> udf[Option[Platform], String](Platform.fromName),
"dimension" -> udf[Option[Dimension], String](Dimension.fromName),
"definition" -> udf[Option[Definition], String](Definition.fromName)
)
I would then call the appropriate udf based on the column name. This now generates the following stack trace:
Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$2: (string) => zefr__commons__proto__dimension)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
... 16 more
Can you please help resolve this?
Thanks
OK, I was able to reproduce this error.
Here is my proto
syntax = "proto2";
package foo.test;
enum TestEnum {
ABC = 0;
DEF = 1;
}
message Foo {
optional TestEnum testEnum = 1;
}
Here is my code
it should "show spark bug" in {
import spark.implicits._
TestMessageProtoUdt.register()
val data = Seq("ABC", "ABC", "DEF", null)
val df = spark.createDataset(data).withColumnRenamed("value", "testEnum")
val typeChange = udf[Option[TestEnum], String](TestEnum.fromName)
val typedDF = df.withColumn("testEnum", typeChange(df("testEnum")))
val ds = typedDF.as[Foo]
val localDS = ds.collect()
val expected = Array(TestEnum.ABC, TestEnum.ABC, TestEnum.DEF)
assertResult(expected)(localDS)
}
For the purposes of my production code I assume I have the Dataframe already made and I want to then do the type changing and updating it to a Dataset.
@thesamet just a quick ping on this. Do you need more information from me?
Not sure which repo to ask about this on but... is this still unresolved @thesamet @jon-morra-zefr ?
Sorry, this fell through the cracks. Will look into this in the next few days.
I've just released sparksql-scalapb 0.1.8 which should address the issue described here. Tests that demonstrate how to use ScalaPB enums with SparkSQL are here: https://github.com/scalapb/sparksql-scalapb/blob/master/sparksql-scalapb/src/test/scala/DataSpec.scala
In case more issues like this come up in the future, it would be ideal to attach a pull request with a failing test in the above file, so the issue can be demonstrated by running sbt test.
Hello, folks. I know this is probably not the best place to ask, but I have a follow-up question to this problem.
Our project uses maven and ScalaPBC instead of sbt. What is the right way to tell it to run the Udt generator?
Thanks in advance,
Tiago
There's probably no way to do it right now. Can you file a feature request
on github and I'll look into this in the coming few weeks.
On Wed, Sep 13, 2017 at 12:32 PM, Tiago Silveira notifications@github.com
wrote:
Hello, folks. I know this is probably not the best place to ask, but I
have a follow-up question to this problem.Our project uses maven and ScalaPBC instead of sbt. What is the right way
to tell it to run the Udt generator?Thanks in advance,
Tiago—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
https://github.com/scalapb/ScalaPB/issues/210#issuecomment-329272744,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASwrUoPWodGm1ELfef7X_WlTOclD7Dqks5siC3CgaJpZM4LJIe_
.
--
-Nadav
Most helpful comment
Hello, folks. I know this is probably not the best place to ask, but I have a follow-up question to this problem.
Our project uses maven and ScalaPBC instead of sbt. What is the right way to tell it to run the Udt generator?
Thanks in advance,
Tiago