Presto: Can't read decimal type in parquet files written by spark and referenced as external in the hive metastore

Created on 27 Jan 2017  路  11Comments  路  Source: prestodb/presto

Environment:
Data Stored in S3
Using Hive Metastore
Parquet Written with Spark
Presto 0.164

Issue:
Can't read columns that are of Decimal type

Example:
ptntstus | varchar | |
ded_amt | decimal(9,2) | |

presto:default> select * from table;

Query 20170126_231448_00065_r6pat, FAILED, 3 nodes
Splits: 316 total, 49 done (15.51%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20170126_231448_00065_r6pat failed: Can not read value at 0 in block 0 in file s3a://path/part-r-00025-f64e61ca-62ff-40af-bd56-be1ee464f8b7.gz.parquet

However I can read other columns fine.

Most helpful comment

As a work around you can force Spark to use fixed_len_byte_array with a configuration option:
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

All 11 comments

Can you provide instructions for how to write the table using Spark (for someone who has never used Spark)?

Also, what version of Spark and Parquet (in Spark) are you using?

Took a bit to recreate the error from writing, I didn't write the original files. Through making a reproducible example I think I found a bit more specifically when it breaks down.

Spark Version: 2.0.2
Parquet: 1.7.0 (From Spark POM)

When the precision isn't specified, then Presto has no issues. The issue with reading comes from specifying the Precision and Scale:

import scala.math.BigDecimal
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types.Decimal

case class test_parquet(first: String, second: BigDecimal)

val rdd = sc.parallelize(List(test_parquet("a", BigDecimal(100, 2))))

val df = rdd.toDF

val toFile = "s3a://bucket/michael/bug_example.parquet"

df.write.mode(SaveMode.Overwrite).parquet(toFile)

spark.sqlContext.createExternalTable("decimal_bug_example", toFile)

// Test 2

val df2 = df.withColumn("second", col("second").cast(DecimalType(4,2)))

val toFile2 = "s3a://bucket/michael/big_example_specified.parquet"

df2.write.mode(SaveMode.Overwrite).parquet(toFile2)

spark.sqlContext.createExternalTable("decimal_bug_example_specified", toFile2)

From the presto side:

presto:default> select * from decimal_bug_example;
 first |        second        
-------+----------------------
 a     | 1.000000000000000000 
(1 row)

Query 20170127_202609_00020_sf4i9, FINISHED, 3 nodes
Splits: 31 total, 31 done (100.00%)
0:00 [1 rows, 588B] [2 rows/s, 1.3KB/s]

Second Example:

presto:default> select * from decimal_bug_example_specified;

Query 20170127_202637_00021_sf4i9, FAILED, 3 nodes
Splits: 31 total, 8 done (25.81%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20170127_202637_00021_sf4i9 failed: Can not read value at 0 in block 0 in file s3a://bucket/michael/big_example_specified.parquet/part-r-00014-5c5017a0-dfa7-4af8-b02b-54b9b8ec6233.snappy.parquet

I wrote the files using the spark shell running on Yarn.

Sorry for the delay in getting the reproducible example.

Can you attache the file to help whom ever works on this?

I think the difference between the two cases comes from the smaller precision in the second example which makes parquet-mr write the column as int32 instead of fixed_len_byte_array.

$ hadoop jar parquet-tools-1.9.0.jar schema bug_example.parquet/part-r-00000-*
message spark_schema {
  optional binary first (UTF8);
  optional fixed_len_byte_array(16) second (DECIMAL(38,18));
}

$ hadoop jar parquet-tools-1.9.0.jar schema big_example_specified.parquet/part-r-00000-*
message spark_schema {
  optional binary first (UTF8);
  optional int32 second (DECIMAL(4,2));
}

@nezihyigitbasi or @zhenxiao any ideas?

I took a look at this issue with the new Parquet reader (didn't check the old one, the ParquetHiveRecordCursor) and our decimal support in the new reader doesn't properly implement the Parquet spec -- we only handle the fixed_len_byte_array case in this spec. Some work needs to be done to add support for the rest.

As a work around you can force Spark to use fixed_len_byte_array with a configuration option:
spark.sqlContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")

@mastratton3 I put up a PR ( #7537) to fix this issue, can you give it a try?

This should be fixed now, please reopen if you have any problems.

Thanks for fixing this @nezihyigitbasi! Sorry I haven't been able to test it yet. Should be able to soon.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

aminalaee picture aminalaee  路  3Comments

rajeshd3v picture rajeshd3v  路  3Comments

synhershko picture synhershko  路  4Comments

tomz picture tomz  路  3Comments

electrum picture electrum  路  4Comments