We're running Hive and Presto on AWS EMR 5.11.1. This is Hive 2.3.2 and Presto 0.187.
We have a number of tables with the following sorts of DDL:
CREATE TABLE IF NOT EXISTS measurements (
id STRING,
session STRING,
tt_ts INT,
r_crs_id INT,
user_agent STRING
)
PARTITIONED BY(daydate STRING, epoch BIGINT)
CLUSTERED BY(r_crs_id) INTO 64 BUCKETS
STORED AS ORC
LOCATION's3://warehouse/'
tblproperties ("orc.compress"="ZLIB");
Our data is always inserted an hour at a time (the 'epoch' partition) and always with INSERT OVERWRITE in Hive.
When we insert with the 'mr' engine, Hive will create all 64 buckets, whether there is data in them or not. But Hive now defaults to the Tez engine, and mr is deprecated. The Tez engine will only create buckets when there is data to be inserted in a bucket. If all the buckets are not present then when querying with Presto an error such as the following will be returned:
Query XXXX failed: Hive table is corrupt. It is declared as being bucketed, but the files do not match the bucketing declaration. The number of files in the directory (14) does not match the declared bucket count (64) for partition: XXXXX
The error is easy to understand; Presto fails if not all buckets are created. This case is currently showing up with sparse test data. With production data we expect that in most cases the buckets will be filled and created. However we can't chance that a timestamp with sparse or missing data can cause all queries across a range to fail.
Therefore we are looking for clarification so we can ensure that when we move to production we understand whether we can use clustering and under what circumstances.
It appears that in Presto release 0.55 this issue was addressed. See https://prestodb.io/docs/current/release/release-0.55.html under "Hive Bucketed Table" fixes.
Is this a regression?
I received a response that the change referenced in 0.55 was from 4 1/2 years ago. Fair enough, but I'm still not sure how that relates to the issue at hand. What I'm trying to understand is if we simply can't use bucketed tables with Hive if we want to use Presto for queries, given that if we have sparse data for any partition Presto will fail if that partition is included in a range.
I've been working on this issue for a week, and have posted and received some answers in both the Hive and Presto mailing-list/forums. Here's what I have found out:
AFAIK, there is no way to force Hive with the Tez engine (and mr is deprecated) to create all the buckets. So no help from Hive.
There is no way to force Presto to continue when not all the buckets are present (bucket number does not match metastore). So no help from Presto.
So we would seem to be at an impasse. If any partition in the table doesn't have a full complement of buckets, a Presto query will fail. I can insert with the mr engine, but it is much slower and there is no way to tell if it will continue to be supported, since it is deprecated. We run into this in our test environment, but we cannot confidently deploy to production because we can't chance that any particular hour will result in fewer buckets in a partition. This makes Presto too fragile for our use when using buckets.
The only "solution" that I can think of is to do away with bucketing completely and to depend solely on ORC indexes, which seems like a pretty sub-standard solution.
My guess is that Facebook would almost never run into the issue, because there's likely so much data that all buckets are always created, even with Tez.
One solution that worked for me is to insert empty files for the missing buckets once the insert overwrite is done. It's a new step to add to your pipeline (and somehow hacky...) but this way I was able to keep using Tez and Presto stopped complaining!
Ugh! ;-) Temporarily, at least, I've changed it so we don't use buckets as we move to production. This is unfortunate from a performance POV. I've implemented ORC row indexes and sorted by the most important columns. Hopefully, this works but I would prefer to have buckets and sort by rows within those buckets.
What I would really like to see is Presto to be compatible with Hive - IOW if HiveQL works, then Presto would work. I also ran into 7120, which means we can't use bloom filters either.
Can you file an issue for Tez? Not creating all the bucket files seems to be a bug with no good workaround. Given how bucketing works, I don't understand how Tez's behavior is useful.
This is how Hive bucketing works: Rather than naming each bucket file with a specific name, such as bucket5, the file names are sorted and a bucket is simply the Nth file in the sorted list. Thus, if files are missing, you have no way of knowing which bucket number corresponds to a given file.
To @electrum's point, at the very least Tez should have an option to revert to the old behavior of creating empty files.
I tend to agree with you guys. I was actually thinking of 2 workarounds, but I agree that filing an issue with Tez that has an option to create all the bucket files is probably best.
That said, I still think that Presto should have an option that would warn but scan all files in the partition.
Someone pointed out that there was a similar "fix" 4 years ago.
I'll file the Tez bug.
@rabinnh the problem is during planning, Presto sees that the table is bucketed and then builds a query plan for a bucketed table. If the table is not actually bucketed, then the query plan is invalid and the query must fail. I believe Tez works around this by scanning all of the partitions during planning, but that takes ages which is why Presto does not do that during planning. The easy fix, is to simply remove the bucketing flag from these tables since they actually can not be used for bucket aware queries.
Understood, thanks for the explanation. Let's see what the Tez guys say. If they can provide a conf var that will make Tez backwards compatible with MR on Hive then it should make everyone happy. All processes that perform queries would be able to take advantage of buckets.
Have you considered loading your data using Presto? It will always create the proper number of bucket files.
Presto isn’t just for interactive queries — it should hopefully be able to run everything.
@electrum
Query 20180403_114917_00005_95kcz failed: Inserting into bucketed sorted tables is not supported. Table 'default.api_measurements'
On your suggestion, I tried but there are 2 issues. The one above and that Presto doesn't appear to allow SORT BY on inserts.
Presto doesn't support writing bucketed sorted tables, but it also can't take advantage of them, so there is no benefit to doing so. (It can write bucketed, but not bucketed+sorted)
For tables without sorted buckets, sorting on insert doesn't really make sense. The files will come out sorted, but there's nothing in the metadata to indicate or guarantee that, so readers can't take advantage of it.
Thank you for that. I changed the code to use Presto for INSERTs and compromised by not using SORTED BY in the DDL when I create the table.
I had another, hacky workaround that @WhoisDavid recommended, which is that after doing a Hive insert with PyHive I used boto to create 0 length files for those that were missing with the same naming scheme as Hive used.
It's cleaner just using PyHive with the Presto connector. At least I have a solution. I asked a Dev (Gopal) from the Hive project to respond to this ticket to see if the Hive/Presto teams can reconcile the root issue, which is that any inserts done for bucketed tables with Hive/Tez may not create all the files.
Gopal's position is, and I quote:
"Presto could fix their fail-safe for bucketing implementation to actually trust the Hive bucketing spec & get you out of this mess - the bucketing contract for Hive is actual file name -> hash % buckets (Utilities::getBucketIdFromFile).
The file-count is a very flaky way to check if the table is bucketed correctly - either you trust the user to have properly bucketed the table or you don't use it. Failing to work on valid tables does look pretty bad, instead of soft fallbacks."
Utilities::getBucketIdFromFile
@rabinnh, This answers my previous question to you asking how Hive determines buckets.
But what about this in code in Hive (org.apache.hadoop.hive.ql.exec.Partition)?
public Path getBucketPath(int bucketNum) {
FileStatus srcs[] = getSortedPaths();
if (srcs == null) {
return null;
}
return srcs[bucketNum].getPath();
}
It's doing sorting. As a result, it wouldn't work if the file count doesn't match. Is the code dead code now? It's still in Hive code base.
@haozhun I'd suggest that you head over to the Hive mailing list and ask there. That was a quote from those developers.
@rabinnh What are your files named when you write a partition with Tez?
@haozhun in current Hive code base, Partition.getBucketPath(int) seems to be applied to sampling only, so it may correct under _some_ assumptions or even take some shortcuts.
@electrum Same as with Hive, just not all of them, example:
000000_00
000001_00
000002_00
etc
Basically 0 based, zero padded 6 digits followed by underscore 0. I assume that the '_0' is for updates or deletes, but we only write each partition idempotently, so I don't know for sure.
Actually, I found this, which gives a much more thorough description (we use ORC):
Thanks @rabinnh for getting this sorted out.
We are happy to make changes. But before we decide the path forward, I have two more questions.
to actually trust the Hive bucketing spec & get you out of this mess
This implies there is a bucketing spec. It would help if you can point us to that. This way, we can make a reference to that when we introduce a fix.
The file-count is a very flaky way to check if the table is bucketed correctly - either you trust the user to have properly bucketed the table or you don't use it. Failing to work on valid tables does look pretty bad, instead of soft fallbacks."
We agree on this position from Gopal. However, Gopal V introduced getBucketIdFromFile in Hive-11525 for Hive 2.0. https://github.com/apache/hive/commit/619ff6e99ab1814a31da52b743c4a7fc5d9b938a. Before that, was there anywhere in Hive where the naming pattern is depended on when trying to determine the bucket number of a file? If there were a way other than sorting back then, we were not aware of it.
Notes:
nnnnnnn_0_bucketN_queryID@haozhun As Gopal says in the quote:
"The bucketing contract for Hive is actual file name -> hash % buckets (Utilities::getBucketIdFromFile)."
So I think that he's saying that function is the "spec". I did ask again though just to see if there was some more formal documentation.
@rabinnh
I read the function you pointed out previously. But I didn't actually understand what this part of the sentence is trying to say: actual file name -> hash % buckets.
The code also doesn't have a modulo in it. Do you mind explaining that sentence?
@haozhun I am NOT the Hive developer. I am just a noob user. I have been corresponding with Gopal, who is the Hive developer, so I'm afraid I am in the same boat as you.
One more note; I discovered today that Presto will also fail a query if the S3 bucket is valid, but there are no partitions created at all yet with the same error. I would have expected it to return no data.
A corner case, I'll grant you.
presto:default> select * from nti_measurements where epoch = 1522857600;
Query 20180404_225234_00037_95kcz, FAILED, 3 nodes
Splits: 16 total, 0 done (0.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
Query 20180404_225234_00037_95kcz failed: Hive table is corrupt. It is declared as being bucketed, but the files do not match the bucketing declaration. The number of files in the directory (0) does not match the declared bucket count (61) for partition: daydate=2018-04-04/epoch=1522857600
Is this the expected behavior? I guess there has to be at least one partition with data?
The Hive query:
hive> select * from nti_measurements where epoch = 1522857600;
OK
Time taken: 3.328 seconds
bq. Presto will also fail a query if the S3 bucket is valid, but there are no partitions created at all
@rabinnh sounds like a separate issue, would you create one?
I understand this would happen also when we take existing partitioned & bucketed table and delete all data, right?
Yes, it's the same either way. Empty S3 bucket, no partitions created yet (S3 bucket empty). I suppose that if the "bucket count" issue is resolved it will solve this as well. Still think I should address it in a separate issue?
@haozhun I got a response from Gopal on the Hive mailing list, which I quote below. Short answer seems to be that there isn't a written specification, per se:
This worked the other way around in time, than writing a spec first - ACIDv1 implemented Streaming ingest via Storm, it used an explicit naming "bucket_
" for the filename. Since until the compaction runs the actual base files don't exist, the ACID bucketing implementation has to handle missing buckets as 0 rows in base file + possibly more rows in uncompacted deltas.
ACID's implementation has forced the two bucketing implementations to work similarly, for the ability to do bucket map-joins between ACID & non-ACID bucketed tables. Particularly about the modulus for -ve numbers, which was broken in Hive-1.0.
https://issues.apache.org/jira/browse/HIVE-12025
that's the place where this all got refactored so that joins & filters for bucketed tables work the same way for ACID & non-ACID tables.
Because of that spec lives in the comments now as a Regex.
They were looking for something more explicit, I think.
I think a simple unit test will probably help them a bit more.
create external table bucketed (x int) clustered by (x) into 4 buckets stored as orc;
insert into bucketed values(1),(2),(3),(4);
insert into bucketed values(1),(2),(3),(4);0: jdbc:hive2://localhost:2181/> dfs -ls /apps/hive/warehouse/bucketed;
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:13 /apps/hive/warehouse/bucketed/000000_0 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:14 /apps/hive/warehouse/bucketed/000000_0_copy_1 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:13 /apps/hive/warehouse/bucketed/000001_0 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:14 /apps/hive/warehouse/bucketed/000001_0_copy_1 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:13 /apps/hive/warehouse/bucketed/000002_0 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:14 /apps/hive/warehouse/bucketed/000002_0_copy_1 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:13 /apps/hive/warehouse/bucketed/000003_0 |
| -rw-r--r-- 3 hive hdfs 181 2018-04-04 23:14 /apps/hive/warehouse/bucketed/000003_0_copy_1 |Even when all buckets are covered Presto should be expecting >1 files per bucket.
I saw a JIRA comment which said "sort in file order and assign buckets", you can see that is only applicable for the 1st insert to table (& the regex will remove the copy numbering).
And oddly enough this week, I saw an academic paper with a negative analysis of Hive bucketing.
Hello,
Is there any timeline for when this will be included in a released version of Presto?
Hello,
We are facing similar issue when running presto queries on hive ORC (transactional tables)
Environment - AWS EMR - 5.13.0
Hive Version - Hive 2.3.2-amzn-2
Presto Version - 0.194
Error : Hive table 'presto-orc-test' is corrupt. Found sub-directory in bucket directory for partition:
Should we wait for the newer version of presto or is there any workaround to resolve the issue?
Any help would be greatly appreciated.
Thank you again
You don't really provide a lot of detail about your structure other than the error message. I would suggest posting to the mailing list with more detail.
Our issue was that we defined our tables with CLUSTERED BY. Hive only creates the files that it needs, while Presto expects the number of files to exactly match the number of buckets specified. Your issue (from what I can gather) is that you are using transactional Hive tables. It sounds remotely related but the solutions would likely be different.
FWIW, we tried the following:
We ended up simply not using CLUSTERED BY in our table definitions. We do use well designed partitioning and ORC indexes. For our application we don't believe that we've lost much performance, if any by not using bucketing.
We are facing similar issue when running presto queries on hive ORC (transactional tables)
@cskbhatt please refer to https://github.com/prestodb/presto/issues/1970
@electrum
Presto doesn't support writing bucketed sorted tables, but it also can't take advantage of them, so there is no benefit to doing so. (It can write bucketed, but not bucketed+sorted)
Actually sorting is the primary disk saving option for ORC. Less in size also means less to read and therefore it also improves the performance. It would be great if Presto was able to insert into bucketed+sorted tables since Tez is going the wrong direction and MR is being deprecated.
@l1x supported for writing sorted bucketed was added a couple of months ago.
@dain thanks for the update, good to know! Do you know the exact version?
It was added in 0.202. In 0.208, we removed the size limitation. However, that change introduced a data loss bug, so wait for 0.209 (should be in the next few days).
I haven't seen Spark mentioned anywhere around here, so here is a quick summary - it also produces fewer, non-empty buckets. If there are fewer datapoints than buckets, you are guaranteed to run into this.
Since Spark is becoming quite popular, I expect more people to run into this. Perhaps someone from Teradata could offer a patch in their version (https://github.com/prestodb/presto/issues/8557#issuecomment-317249970) or Presto could adopt the filename-based bucket detection (rather than just sorting the files and taking the n-th file) - but I'm very new to this problem, only been reading upon this for the past hour or so.
It might converge from Spark's side instead, though. If you look at slide 49 here, Facebook is working on improving bucketing support in Spark, including generating of empty files. These JIRAs have, unfortunately, been unresolved for quite some time now.
The issue of fewer buckets than specified seems to be resolved in PrestoSQL in https://github.com/prestosql/presto/pull/822
With EMR 5-21 (Presto 0.215), still got this issue: Query 20191107_222140_00006_rf89j failed: Hive table 'dev.wifi_logs' is corrupt. The number of files in the directory (256) does not match the declared bucket count (64) for partition: date_key=2019-11-05
Are there any configurations to ignore this check?
Is there a workaround In Presto to handle multiple files per bucket (a table with 10 buckets has 40 files, 4 files per bucket).
prestosql has fixed this last year https://github.com/prestosql/presto/pull/822
so would like to check if there's a workaround for Prestodb when generating files using Spark.