Presto: Optimize rank function execution

Created on 3 Jul 2019  ·  9Comments  ·  Source: prestosql/presto

When row_number comes with a limit, presto will use TopN algorithm instead of sorting so such query will run faster and use less memory.

Explain (type distributed) select * from 
( SELECT 
orderkey, 
clerk, 
totalprice, 
row_number() OVER (PARTITION BY clerk ORDER BY totalprice DESC) AS row_number 
FROM orders ) 
where row_number <= 5;

row_number Query Plan

Fragment 0 [SINGLE]
     Output layout: [orderkey, clerk, totalprice, row_number_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey, clerk, totalprice, rnk] => [[orderkey, clerk, totalprice, row_number_2]]
             Cost: ?, Output: ? rows (?B)
             rnk := row_number_2
         - RemoteSource[1] => [[orderkey, totalprice, clerk, row_number_2]]
                 Cost: ?, Output: ? rows (?B)

 Fragment 1 [HASH]
     Output layout: [orderkey, totalprice, clerk, row_number_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Project[] => [[orderkey, totalprice, clerk, row_number_2]]
             Cost: ?, Output: ? rows (?B)
         - **TopNRowNumber**[partition by (clerk), order by (totalprice DESC_NULLS_LAST) limit 5][$hashvalue] => [[orderkey, totalprice, clerk, $hashvalue, row_number_2]]
                 Cost: ?, Output: ? rows (?B)
                 row_number_2 := row_number()
             - LocalExchange[HASH][$hashvalue] ("clerk") => [[orderkey, totalprice, clerk, $hashvalue]]
                     Cost: ?, Output: ? rows (?B)
                 - RemoteSource[2] => [[orderkey, totalprice, clerk, $hashvalue_34]]
                         Cost: ?, Output: ? rows (?B)

 Fragment 2 [tpch:orders:15000]
     Output layout: [orderkey, totalprice, clerk, $hashvalue_35]
     Output partitioning: HASH [clerk][$hashvalue_35]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - **TopNRowNumber**[partition by (clerk), order by (totalprice DESC_NULLS_LAST) limit 5][$hashvalue_35] => [[orderkey, totalprice, clerk, $hashvalue_3
             Cost: ?, Output: ? rows (?B)
             row_number_2 := row_number()
         - ScanProject[table = tpch:orders:sf0.01, grouped = false] => [[orderkey, totalprice, clerk, $hashvalue_35]]
                 Cost: ?, Output: ? rows (?B)
                 $hashvalue_35 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("clerk"), 0))
                 clerk := tpch:clerk
                 orderkey := tpch:orderkey
                 totalprice := tpch:totalprice
                 tpch:orderstatus
                     :: [[F], [O], [P]]

Similar rank query are not optimized these days because rank output differs row_number output in that “equal” rows are ranked the same.

Explain (type distributed) select * from ( SELECT orderkey, clerk, totalprice, rank() OVER (PARTITION BY clerk ORDER BY totalprice DESC) AS rnk FROM orders ) where rnk <= 5;

rank Query Plan

Fragment 0 [SINGLE]
     Output layout: [orderkey, clerk, totalprice, rank]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Output[orderkey, clerk, totalprice, rnk] => [[orderkey, clerk, totalprice, rank]]
             Cost: ?, Output: ? rows (?B)
             rnk := rank
         - RemoteSource[1] => [[orderkey, totalprice, clerk, rank]]
                 Cost: ?, Output: ? rows (?B)

 Fragment 1 [HASH]
     Output layout: [orderkey, totalprice, clerk, rank]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - Filter[filterPredicate = ("rank" <= BIGINT '5')] => [[orderkey, totalprice, clerk, rank]]
             Cost: ?, Output: ? rows (?B)
         - Project[] => [[orderkey, totalprice, clerk, rank]]
                 Cost: ?, Output: ? rows (?B)
             - Window[partition by (clerk), order by (totalprice DESC_NULLS_LAST)][$hashvalue] => [[orderkey, totalprice, clerk, $hashvalue, rank]]
                     Cost: ?, Output: ? rows (?B)
                     rank := rank() RANGE UNBOUNDED_PRECEDING CURRENT_ROW
                 - LocalExchange[HASH][$hashvalue] ("clerk") => [[orderkey, totalprice, clerk, $hashvalue]]
                         Estimates: {rows: 15000 (483.40kB), cpu: 1845000.00, memory: 0.00, network: 495000.00}
                         Cost: ?, Output: ? rows (?B)
                     - RemoteSource[2] => [[orderkey, totalprice, clerk, $hashvalue_33]]
                             Cost: ?, Output: ? rows (?B)

 Fragment 2 [tpch:orders:15000]
     Output layout: [orderkey, totalprice, clerk, $hashvalue_34]
     Output partitioning: HASH [clerk][$hashvalue_34]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     - ScanProject[table = tpch:orders:sf0.01, grouped = false] => [[orderkey, totalprice, clerk, $hashvalue_34]]
             Estimates: {rows: 15000 (483.40kB), cpu: 360000.00, memory: 0.00, network: 0.00}/{rows: 15000 (483.40kB), cpu: 855000.00, memory: 0.00, ne
             Cost: ?, Output: ? rows (?B)
             $hashvalue_34 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("clerk"), 0))
             clerk := tpch:clerk
             orderkey := tpch:orderkey
             totalprice := tpch:totalprice

Actually "rank with a limit" query could be optimized by replacing the TopNRowNumber execution in row_number query plan with outputting top N allow duplicates:

input: 1, 9, 7, 9, 5, 10, 7
top 3 using Heap: 10, 9, 9
top 3 allow duplicates: 10, 9, 9, 7, 7 

And top N allow duplicates could be implemented using Map + Heap . Such optimization avoids the sorting in WindowOperator and make query run faster with less memory.

Any thoughts?

Most helpful comment

We should have a config and session property to allow disabling the optimization.

There are such properties, see optimize_top_n_row_number.

The problem with this optimization is that people use row_number() to implement versioning of data rows, so you can create view that selects newest data, e.g:

SELECT *
FROM 
(SELECT ..., row_number() OVER (PARTITION BY row_id ORDER BY update_date DESC) AS version)
WHERE version == 1

In such case TopNOperator causes huge memory overhead since row_id is extremely distinct and each row doesn't have tons of updates.
We've seen multiple customers hit the issue thus we introduced the toggle.
In fact, I'm thinking about disabling the optimize_top_n_row_number by default. @electrum did you hit similar issues at FB?

All 9 comments

This seems reasonable. The amount of memory required is potentially unbounded, but the same is true for window functions. The heap likely has more size overhead, but this shouldn’t matter except for pathological cases. We should have a config and session property to allow disabling the optimization.

This could also be done for dense_rank.

We should have a config and session property to allow disabling the optimization.

There are such properties, see optimize_top_n_row_number.

The problem with this optimization is that people use row_number() to implement versioning of data rows, so you can create view that selects newest data, e.g:

SELECT *
FROM 
(SELECT ..., row_number() OVER (PARTITION BY row_id ORDER BY update_date DESC) AS version)
WHERE version == 1

In such case TopNOperator causes huge memory overhead since row_id is extremely distinct and each row doesn't have tons of updates.
We've seen multiple customers hit the issue thus we introduced the toggle.
In fact, I'm thinking about disabling the optimize_top_n_row_number by default. @electrum did you hit similar issues at FB?

Do you have query to replicate such issue? optimize_top_n_row_number will introduce TopNOperator in both leaf fragment and intermediate fragment. Which one causes the memory overhead? @sopel39

Which one causes the memory overhead?

The final one that is long-living

In such case TopNOperator causes huge memory overhead since row_id is extremely distinct and each row doesn't have tons of updates.

Why WindowOpeator don't have such issue when optimize_top_n_row_number is off? Does this imply certain improvement can be done in TopNOperator? @sopel39

Why WindowOpeator don't have such issue when optimize_top_n_row_number is off?

WindowOperator keeps entire pages so row overhead is minimal. On the other hand, TopNOperator keeps rows heap for each group and that introduces huge overhead of there are tons of groups and little rows per group.
It might be possible to improve TopNOperator though, but I didn't investigate that deeply.

@sopel39 Are you OK with proposed optimization behind a feature flag? Is there other concerns? Our POC shows promising result in both execution time and memory. Is it possible to use CBO to determine which execution to use?

@qqibrow such feature flag would be desirable.

Just giving a +1 here, can't use rank() in some queries due to excessive memory usage and the workaround of multiple joins is slow and error prone.

I talked with @findepi in the slack channel about this problem and he pointed me to this issue.

Was this page helpful?
0 / 5 - 0 ratings