Scalapb: Incompatibility between enums and Spark SQL

Created on 18 Feb 2016  路  8Comments  路  Source: scalapb/ScalaPB

Spark SQL attempts to infer the schema of your data using reflection. This works for case classes. ScalaPB messages are case classes, so I'd hoped this would just work for my collection of protos.

It's _close_. Schema discovery seems to work fine unless my message contains enums.

Here's some code:

case class MiniRide(pickupTime: Option[Int], totalAmount: Option[Float], paymentType: Option[Payment])

val ridesAsCaseClasses = ridesAsProtos.map(ride => MiniRide(ride.pickupTime, ride.totalAmount, ride.paymentType))

ridesAsCaseClasses.toDF().registerTempTable("rides")
val priceyRides = sqlContext.sql("SELECT COUNT(*) FROM rides WHERE totalAmount > 200").collect().foreach(println)

I'm working with the NYC Taxi data. See the full Rides proto. Payment is an enum. If I drop that field from the case class, this code works. If I include it, I get the following runtime error:

Exception in thread "main" scala.ScalaReflectionException: <none> is not a term
    at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:259)
    at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:682)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:659)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:693)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:691)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:691)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:630)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
    at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:94)
    at com.sidewalklabs.TlcConverter$.main(TlcConverter.scala:49)
    at com.sidewalklabs.TlcConverter.main(TlcConverter.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I'm honestly not sure if this is more an issue for SparkSQL or for ScalaPB, but it would be nice if I could use SparkSQL with my protos!

Most helpful comment

@thesamet how did you resolve the enum issue with Spark SQL?

All 8 comments

Maybe if the enum we generate would have been annotated with UserDefinedType like explained here
http://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql

this would work. Maybe try to edit the generated code directly based on the example at SO and see if it helps? Then we can see if this can be fixed without needing special support from SparkSQL.

Sorry, I think that's beyond my Scala abilities. I'm happy to provide a more detailed repro if it helps.

Yes, a small repo I can fork that can help me reproduce this problem would be great. Out of curiosity, where/how do you store the input protocol buffers?

@thesamet I put together a minimal-ish repro here: https://github.com/danvk/scalapb-repro/

The code in that repro builds & runs successfully. If you uncomment the lines which use a message with an enum, however, you'll get the <none> is not a term error.

I have good news! I have added SparkSQL support for ScalaPB. See docs here: http://trueaccord.github.io/ScalaPB/sparksql.html

@thesamet how did you resolve the enum issue with Spark SQL?

It's been a while - sorry. If there's a specific issue you're encountering please let me know.

Was this page helpful?
0 / 5 - 0 ratings