When row_number comes with a limit, presto will use TopN algorithm instead of sorting so such query will run faster and use less memory.
Explain (type distributed) select * from
( SELECT
orderkey,
clerk,
totalprice,
row_number() OVER (PARTITION BY clerk ORDER BY totalprice DESC) AS row_number
FROM orders )
where row_number <= 5;
row_number Query Plan
Fragment 0 [SINGLE]
Output layout: [orderkey, clerk, totalprice, row_number_2]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
- Output[orderkey, clerk, totalprice, rnk] => [[orderkey, clerk, totalprice, row_number_2]]
Cost: ?, Output: ? rows (?B)
rnk := row_number_2
- RemoteSource[1] => [[orderkey, totalprice, clerk, row_number_2]]
Cost: ?, Output: ? rows (?B)
Fragment 1 [HASH]
Output layout: [orderkey, totalprice, clerk, row_number_2]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
- Project[] => [[orderkey, totalprice, clerk, row_number_2]]
Cost: ?, Output: ? rows (?B)
- **TopNRowNumber**[partition by (clerk), order by (totalprice DESC_NULLS_LAST) limit 5][$hashvalue] => [[orderkey, totalprice, clerk, $hashvalue, row_number_2]]
Cost: ?, Output: ? rows (?B)
row_number_2 := row_number()
- LocalExchange[HASH][$hashvalue] ("clerk") => [[orderkey, totalprice, clerk, $hashvalue]]
Cost: ?, Output: ? rows (?B)
- RemoteSource[2] => [[orderkey, totalprice, clerk, $hashvalue_34]]
Cost: ?, Output: ? rows (?B)
Fragment 2 [tpch:orders:15000]
Output layout: [orderkey, totalprice, clerk, $hashvalue_35]
Output partitioning: HASH [clerk][$hashvalue_35]
Stage Execution Strategy: UNGROUPED_EXECUTION
- **TopNRowNumber**[partition by (clerk), order by (totalprice DESC_NULLS_LAST) limit 5][$hashvalue_35] => [[orderkey, totalprice, clerk, $hashvalue_3
Cost: ?, Output: ? rows (?B)
row_number_2 := row_number()
- ScanProject[table = tpch:orders:sf0.01, grouped = false] => [[orderkey, totalprice, clerk, $hashvalue_35]]
Cost: ?, Output: ? rows (?B)
$hashvalue_35 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("clerk"), 0))
clerk := tpch:clerk
orderkey := tpch:orderkey
totalprice := tpch:totalprice
tpch:orderstatus
:: [[F], [O], [P]]
Similar rank query are not optimized these days because rank output differs row_number output in that “equal” rows are ranked the same.
Explain (type distributed) select * from ( SELECT orderkey, clerk, totalprice, rank() OVER (PARTITION BY clerk ORDER BY totalprice DESC) AS rnk FROM orders ) where rnk <= 5;
rank Query Plan
Fragment 0 [SINGLE]
Output layout: [orderkey, clerk, totalprice, rank]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
- Output[orderkey, clerk, totalprice, rnk] => [[orderkey, clerk, totalprice, rank]]
Cost: ?, Output: ? rows (?B)
rnk := rank
- RemoteSource[1] => [[orderkey, totalprice, clerk, rank]]
Cost: ?, Output: ? rows (?B)
Fragment 1 [HASH]
Output layout: [orderkey, totalprice, clerk, rank]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
- Filter[filterPredicate = ("rank" <= BIGINT '5')] => [[orderkey, totalprice, clerk, rank]]
Cost: ?, Output: ? rows (?B)
- Project[] => [[orderkey, totalprice, clerk, rank]]
Cost: ?, Output: ? rows (?B)
- Window[partition by (clerk), order by (totalprice DESC_NULLS_LAST)][$hashvalue] => [[orderkey, totalprice, clerk, $hashvalue, rank]]
Cost: ?, Output: ? rows (?B)
rank := rank() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
- LocalExchange[HASH][$hashvalue] ("clerk") => [[orderkey, totalprice, clerk, $hashvalue]]
Estimates: {rows: 15000 (483.40kB), cpu: 1845000.00, memory: 0.00, network: 495000.00}
Cost: ?, Output: ? rows (?B)
- RemoteSource[2] => [[orderkey, totalprice, clerk, $hashvalue_33]]
Cost: ?, Output: ? rows (?B)
Fragment 2 [tpch:orders:15000]
Output layout: [orderkey, totalprice, clerk, $hashvalue_34]
Output partitioning: HASH [clerk][$hashvalue_34]
Stage Execution Strategy: UNGROUPED_EXECUTION
- ScanProject[table = tpch:orders:sf0.01, grouped = false] => [[orderkey, totalprice, clerk, $hashvalue_34]]
Estimates: {rows: 15000 (483.40kB), cpu: 360000.00, memory: 0.00, network: 0.00}/{rows: 15000 (483.40kB), cpu: 855000.00, memory: 0.00, ne
Cost: ?, Output: ? rows (?B)
$hashvalue_34 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("clerk"), 0))
clerk := tpch:clerk
orderkey := tpch:orderkey
totalprice := tpch:totalprice
Actually "rank with a limit" query could be optimized by replacing the TopNRowNumber execution in row_number query plan with outputting top N allow duplicates:
input: 1, 9, 7, 9, 5, 10, 7
top 3 using Heap: 10, 9, 9
top 3 allow duplicates: 10, 9, 9, 7, 7
And top N allow duplicates could be implemented using Map + Heap . Such optimization avoids the sorting in WindowOperator and make query run faster with less memory.
Any thoughts?
This seems reasonable. The amount of memory required is potentially unbounded, but the same is true for window functions. The heap likely has more size overhead, but this shouldn’t matter except for pathological cases. We should have a config and session property to allow disabling the optimization.
This could also be done for dense_rank.
We should have a config and session property to allow disabling the optimization.
There are such properties, see optimize_top_n_row_number.
The problem with this optimization is that people use row_number() to implement versioning of data rows, so you can create view that selects newest data, e.g:
SELECT *
FROM
(SELECT ..., row_number() OVER (PARTITION BY row_id ORDER BY update_date DESC) AS version)
WHERE version == 1
In such case TopNOperator causes huge memory overhead since row_id is extremely distinct and each row doesn't have tons of updates.
We've seen multiple customers hit the issue thus we introduced the toggle.
In fact, I'm thinking about disabling the optimize_top_n_row_number by default. @electrum did you hit similar issues at FB?
Do you have query to replicate such issue? optimize_top_n_row_number will introduce TopNOperator in both leaf fragment and intermediate fragment. Which one causes the memory overhead? @sopel39
Which one causes the memory overhead?
The final one that is long-living
In such case TopNOperator causes huge memory overhead since row_id is extremely distinct and each row doesn't have tons of updates.
Why WindowOpeator don't have such issue when optimize_top_n_row_number is off? Does this imply certain improvement can be done in TopNOperator? @sopel39
Why WindowOpeator don't have such issue when optimize_top_n_row_number is off?
WindowOperator keeps entire pages so row overhead is minimal. On the other hand, TopNOperator keeps rows heap for each group and that introduces huge overhead of there are tons of groups and little rows per group.
It might be possible to improve TopNOperator though, but I didn't investigate that deeply.
@sopel39 Are you OK with proposed optimization behind a feature flag? Is there other concerns? Our POC shows promising result in both execution time and memory. Is it possible to use CBO to determine which execution to use?
@qqibrow such feature flag would be desirable.
Just giving a +1 here, can't use rank() in some queries due to excessive memory usage and the workaround of multiple joins is slow and error prone.
I talked with @findepi in the slack channel about this problem and he pointed me to this issue.
Most helpful comment
There are such properties, see
optimize_top_n_row_number.The problem with this optimization is that people use
row_number()to implement versioning of data rows, so you can create view that selects newest data, e.g:In such case
TopNOperatorcauses huge memory overhead sincerow_idis extremely distinct and each row doesn't have tons of updates.We've seen multiple customers hit the issue thus we introduced the toggle.
In fact, I'm thinking about disabling the
optimize_top_n_row_numberby default. @electrum did you hit similar issues at FB?