presto:tiny> CREATE TABLE test_bucket (
-> bucket_key INT,
-> foo VARCHAR,
-> bar VARCHAR,
-> partition_key VARCHAR)
-> WITH (
-> partitioned_by=ARRAY['partition_key'],
-> bucketed_by=ARRAY['bucket_key'],
-> bucket_count=16);
CREATE TABLE
presto:tiny> INSERT INTO test_bucket VALUES (1, 'foo', 'bar', '2018-12-18');
INSERT: 1 row
Query 20181219_202234_58401_d46p5, FINISHED, 18 nodes
Splits: 22 total, 22 done (100.00%)
0:06 [0 rows, 0B] [0 rows/s, 0B/s]
presto:tiny> EXPLAIN (TYPE DISTRIBUTED)
-> SELECT rank() OVER (PARTITION BY bucket_key)
-> FROM test_bucket;
Query Plan
-----------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [rank]
Output partitioning: SINGLE []
Grouped Execution: false
- Output[_col0] => [rank:bigint]
_col0 := rank
- RemoteSource[1] => [rank:bigint]
Fragment 1 [HASH]
Output layout: [rank]
Output partitioning: SINGLE []
Grouped Execution: false
- Project[] => [rank:bigint]
- Window[partition by (bucket_key)][$hashvalue] => [bucket_key:integer, $hashvalue:bigint, rank:bigint]
rank := rank() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
- LocalExchange[HASH][$hashvalue] ("bucket_key") => bucket_key:integer, $hashvalue:bigint
- RemoteSource[2] => [bucket_key:integer, $hashvalue_4:bigint]
Fragment 2 [prism:buckets=16, hiveTypes=[int]]
Output layout: [bucket_key, $hashvalue_5]
Output partitioning: HASH [bucket_key][$hashvalue_5]
Grouped Execution: false
- ScanProject[table = prism:di:test_bucket, grouped = false] => [bucket_key:integer, $hashvalue_5:bigint]
$hashvalue_5 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("bucket_key"), 0))
LAYOUT: di.test_bucket bucket=16
bucket_key := bucket_key:int:0:REGULAR
partition_key:string:-1:PARTITION_KEY
:: [[2018-12-18]]
(1 row)
Query 20181219_202654_59652_d46p5, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
Looks to me if window partition (e.g. ['a', 'b', 'c']) is a super set of bucket columns (e.g. ['a', 'b']), we don't need to do a remote exchange, since the required rows are available locally. In this way fragment 1 and 2 could be combined into one fragment.
Let me know if I'm missing some obvious/essential part. If it looks good, I could work on this optimization.
cc @wenleix @haozhun
cc @kokosing , @sopel39
in your example:
presto:tiny> EXPLAIN (TYPE DISTRIBUTED) -> SELECT rank() OVER (PARTITION BY bucket_key) -> FROM test_bucket;
i think you need to repartition after table scan to combine rows with the same bucket_key but from different partitions (unless we realize there is exactly one partition)
@shixuan-fan does the same happen when the table is not partitioned?
i think you need to repartition after table scan to combine rows with the same bucket_key but from different partitions (unless we realize there is exactly one partition)
Actually I think Hive does support buckets that span across partitions as splits for the same bucket but from different partitions should be run by the same worker.
Could you check what ConnectorTableLayout#getNodePartitioning returns for this table? I expect it to be non-empty for the table.
Does the same happen when you run GROUP BY?
Do you have hive.bucket_execution_enabled and plan_with_table_node_partitioning enabled?
Otherwise I suggest to debug AddExchanges and PropertyDerivations to see why it added an exchange
@findepi:
It is the same for unpartitioned table.
presto:di> CREATE TABLE test_bucket_unpartitioned (bucket_key, foo, bar)
-> WITH (
-> bucketed_by=ARRAY['bucket_key'],
-> bucket_count=16)
-> AS
-> SELECT 1, 'foo', 'bar';
CREATE TABLE: 1 row
presto:di> EXPLAIN (TYPE DISTRIBUTED)
-> SELECT rank() OVER (PARTITION BY bucket_key)
-> FROM test_bucket_unpartitioned;
...
Fragment 1 [HASH]
...
- Window[partition by (bucket_key)][$hashvalue] => [bucket_key:integer, $hashvalue:bigint, rank:bigint]
rank := rank() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
- LocalExchange[HASH][$hashvalue] ("bucket_key") => bucket_key:integer, $hashvalue:bigint
- RemoteSource[2] => [bucket_key:integer, $hashvalue_4:bigint]
Fragment 2 [prism:buckets=16, hiveTypes=[int]]
Output layout: [bucket_key, $hashvalue_5]
...
@sopel39:
I've checked that hive.bucket_execution_enabled and plan_with_table_node_partitioning are both set to true. For GROUP BY, there won't be additional remote exchange between aggregate and table scan.
...
- Aggregate(FINAL)[bucket_key][$hashvalue_6] => [bucket_key:integer, $hashvalue_6:bigint]
- LocalExchange[HASH][$hashvalue_6] ("bucket_key") => bucket_key:integer, $hashvalue_6:bigint
- Aggregate(PARTIAL)[bucket_key][$hashvalue_7] => [bucket_key:integer, $hashvalue_7:bigint]
- ScanProject[table = prism:di:test_bucket, grouped = false] => [bucket_key:integer, $hashvalue_7:bigint]
...
There is no ConnectorTableLayout#getNodePartitioning so I'm a bit confused. ActualProperties#getNodePartitioning is not empty, and ConnectorTableLayout#getTablePartitioning is not empty either. Hope I'm looking at the right method.
Looks like the remote exchange is added here:
https://github.com/prestodb/presto/blob/7dfbe1be3c61455047827335c5dadd94125e0c48/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java#L309-L311
, after we checked isStreamPartitionedOn. Yet in visitAggregation()
https://github.com/prestodb/presto/blob/7dfbe1be3c61455047827335c5dadd94125e0c48/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java#L232, I do see that we also checked isNodePartitionedOn() in addition to isStreamPartitionOn().
For window function, if PARTITION BY columns include columns the node is partitioned on, I think a local exchange should suffice?
There is no ConnectorTableLayout#getNodePartitioning so I'm a bit confused.
Of course I meant ConnectorTableLayout#getTablePartitioning.
Looks like the remote exchange is added here, after we checked isStreamPartitionedOn. Yet in visitAggregation(), I do see that we also checked isNodePartitionedOn() in addition to isStreamPartitionOn().
Nice find! That's probably the cause. Do you want to provide a fix?
Sure. Will put up a pull request. Thanks for confirming :)
Closed via #12122