Environment:
Data Stored in S3
Using Hive Metastore
Parquet Written with Spark
Presto 0.164
Issue:
Can't read columns that are of Decimal type
Example:
ptntstus | varchar | |
ded_amt | decimal(9,2) | |
presto:default> select * from table;
Query 20170126_231448_00065_r6pat, FAILED, 3 nodes
Splits: 316 total, 49 done (15.51%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
Query 20170126_231448_00065_r6pat failed: Can not read value at 0 in block 0 in file s3a://path/part-r-00025-f64e61ca-62ff-40af-bd56-be1ee464f8b7.gz.parquet
However I can read other columns fine.
Can you provide instructions for how to write the table using Spark (for someone who has never used Spark)?
Also, what version of Spark and Parquet (in Spark) are you using?
Took a bit to recreate the error from writing, I didn't write the original files. Through making a reproducible example I think I found a bit more specifically when it breaks down.
Spark Version: 2.0.2
Parquet: 1.7.0 (From Spark POM)
When the precision isn't specified, then Presto has no issues. The issue with reading comes from specifying the Precision and Scale:
import scala.math.BigDecimal
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.Decimal
case class test_parquet(first: String, second: BigDecimal)
val rdd = sc.parallelize(List(test_parquet("a", BigDecimal(100, 2))))
val df = rdd.toDF
val toFile = "s3a://bucket/michael/bug_example.parquet"
df.write.mode(SaveMode.Overwrite).parquet(toFile)
spark.sqlContext.createExternalTable("decimal_bug_example", toFile)
// Test 2
val df2 = df.withColumn("second", col("second").cast(DecimalType(4,2)))
val toFile2 = "s3a://bucket/michael/big_example_specified.parquet"
df2.write.mode(SaveMode.Overwrite).parquet(toFile2)
spark.sqlContext.createExternalTable("decimal_bug_example_specified", toFile2)
From the presto side:
presto:default> select * from decimal_bug_example;
first | second
-------+----------------------
a | 1.000000000000000000
(1 row)
Query 20170127_202609_00020_sf4i9, FINISHED, 3 nodes
Splits: 31 total, 31 done (100.00%)
0:00 [1 rows, 588B] [2 rows/s, 1.3KB/s]
Second Example:
presto:default> select * from decimal_bug_example_specified;
Query 20170127_202637_00021_sf4i9, FAILED, 3 nodes
Splits: 31 total, 8 done (25.81%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
Query 20170127_202637_00021_sf4i9 failed: Can not read value at 0 in block 0 in file s3a://bucket/michael/big_example_specified.parquet/part-r-00014-5c5017a0-dfa7-4af8-b02b-54b9b8ec6233.snappy.parquet
I wrote the files using the spark shell running on Yarn.
Sorry for the delay in getting the reproducible example.
Can you attache the file to help whom ever works on this?
big_example_specified.parquet.tar.gz
bug_example.parquet.tar.gz
Attached are the files gzipped.
I think the difference between the two cases comes from the smaller precision in the second example which makes parquet-mr write the column as int32 instead of fixed_len_byte_array.
$ hadoop jar parquet-tools-1.9.0.jar schema bug_example.parquet/part-r-00000-*
message spark_schema {
optional binary first (UTF8);
optional fixed_len_byte_array(16) second (DECIMAL(38,18));
}
$ hadoop jar parquet-tools-1.9.0.jar schema big_example_specified.parquet/part-r-00000-*
message spark_schema {
optional binary first (UTF8);
optional int32 second (DECIMAL(4,2));
}
@nezihyigitbasi or @zhenxiao any ideas?
I took a look at this issue with the new Parquet reader (didn't check the old one, the ParquetHiveRecordCursor) and our decimal support in the new reader doesn't properly implement the Parquet spec -- we only handle the fixed_len_byte_array case in this spec. Some work needs to be done to add support for the rest.
As a work around you can force Spark to use fixed_len_byte_array with a configuration option:
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
@mastratton3 I put up a PR ( #7537) to fix this issue, can you give it a try?
This should be fixed now, please reopen if you have any problems.
Thanks for fixing this @nezihyigitbasi! Sorry I haven't been able to test it yet. Should be able to soon.
Most helpful comment
As a work around you can force Spark to use fixed_len_byte_array with a configuration option:
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")