Is this a bug?
Is this intentional to avoid skew and low parallelism? If so, what's the long term plan?
-- This issue affects bucket-by-bucket execution.
-- However, the issue itself is unrelated to bucket-by-bucket execution.
-- Bucket-by-bucket also isn't necessary for the reproduce.
set session colocated_join=true;
set session grouped_execution_for_aggregation=true;
set session concurrent_lifespans_per_task=1;
-- Use legacy mark-distinct strategy
-- (Doesn't reproduce with new strategy)
presto:di> set session use_mark_distinct=true;
presto:di> create table test_hjin_md with (partitioned_by=array['ds'], bucketed_by=array['x'], bucket_count=5) as select orderkey x, orderstatus status, '2019-01-01' ds from tpch.tiny.orders;
CREATE TABLE: 15000 rows
presto:di> explain (type distributed) select x, count(status), array_agg(distinct status) from test_hjin_md group by x;
Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [x, count, array_agg]
Output partitioning: SINGLE []
Execution Flow: UNGROUPED_EXECUTION
- Output[x, _col1, _col2] => [x:bigint, count:bigint, array_agg:array(varchar)]
_col1 := count
_col2 := array_agg
- RemoteSource[1] => [x:bigint, count:bigint, array_agg:array(varchar)]
Fragment 1 [HASH]
Output layout: [x, count, array_agg]
Output partitioning: SINGLE []
Execution Flow: UNGROUPED_EXECUTION
- Aggregate(FINAL)[x] => [x:bigint, count:bigint, array_agg:array(varchar)]
count := "count"("count_14")
array_agg := "array_agg"("array_agg_15")
- LocalExchange[HASH][$hashvalue] ("x") => x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue:bigint
- RemoteSource[2] => [x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue_16:bigint]
Fragment 2 [HASH]
Output layout: [x, count_14, array_agg_15, $hashvalue_20]
Output partitioning: HASH [x][$hashvalue_20]
Execution Flow: UNGROUPED_EXECUTION
- Project[] => [x:bigint, count_14:bigint, array_agg_15:array(varchar), $hashvalue_20:bigint]
$hashvalue_20 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0))
- Aggregate(PARTIAL)[x] => [x:bigint, count_14:bigint, array_agg_15:array(varchar)]
count_14 := "count"("status")
array_agg_15 := "array_agg"("status") (mask = status$distinct)
- MarkDistinct[distinct=x:bigint, status:varchar marker=status$distinct] => [x:bigint, status:varchar, status$distinct:boolean]
- Project[] => [x:bigint, status:varchar]
- LocalExchange[HASH][$hashvalue_17] ("x", "status") => x:bigint, status:varchar, $hashvalue_17:bigint
- RemoteSource[3] => [x:bigint, status:varchar, $hashvalue_18:bigint]
Fragment 3 [prism:HivePartitioningHandle{bucketCount=5, hiveTypes=[bigint]}]
Output layout: [x, status, $hashvalue_19]
Output partitioning: HASH [x, status][$hashvalue_19]
Execution Flow: UNGROUPED_EXECUTION
- ScanProject[table = prism:di:test_hjin_md, originalConstraint = true] => [x:bigint, status:varchar, $hashvalue_19:bigint]
Cost: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}/{rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
$hashvalue_19 := "combine_hash"("combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0)), COALESCE("$operator$hash_code"("status"), 0))
LAYOUT: di.test_hjin_md
x := HiveColumnHandle{name=x, hiveType=bigint, hiveColumnIndex=0, columnType=REGULAR}
status := HiveColumnHandle{name=status, hiveType=string, hiveColumnIndex=1, columnType=REGULAR}
HiveColumnHandle{name=ds, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}
:: [[2019-01-01]]
For reference, this is how the plan looks like without DISTINCT
presto:di> explain (type distributed) select x, count(status) from test_hjin_md group by x;
Query Plan
-----------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [x, count]
Output partitioning: SINGLE []
Execution Flow: UNGROUPED_EXECUTION
- Output[x, _col1] => [x:bigint, count:bigint]
_col1 := count
- RemoteSource[1] => [x:bigint, count:bigint]
Fragment 1 [prism:HivePartitioningHandle{bucketCount=5, hiveTypes=[bigint]}]
Output layout: [x, count]
Output partitioning: SINGLE []
Execution Flow: GROUPED_EXECUTION
- Aggregate(FINAL)[x] => [x:bigint, count:bigint]
Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
count := "count"("count_9")
- LocalExchange[HASH][$hashvalue] ("x") => x:bigint, count_9:bigint, $hashvalue:bigint
Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
- Project[] => [x:bigint, count_9:bigint, $hashvalue_10:bigint]
Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
$hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("x"), 0))
- Aggregate(PARTIAL)[x] => [x:bigint, count_9:bigint]
Cost: {rows: ? (?), cpu: ?, memory: ?, network: 0.00}
count_9 := "count"("status")
- TableScan[prism:di:test_hjin_md, originalConstraint = true] => [x:bigint, status:varchar]
Cost: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
LAYOUT: di.test_hjin_md
x := HiveColumnHandle{name=x, hiveType=bigint, hiveColumnIndex=0, columnType=REGULAR}
status := HiveColumnHandle{name=status, hiveType=string, hiveColumnIndex=1, columnType=REGULAR}
HiveColumnHandle{name=ds, hiveType=string, hiveColumnIndex=-1, columnType=PARTITION_KEY}
:: [[2019-01-01]]
@martint , do you intend to change the default for use_mark_distinct?
There are many queries that fail or have worse performance due to lack of parallelism when use_mark_distinct is off. We canβt yet change the default.
Currently for MarkDistinct (https://github.com/prestodb/presto/blob/054c7d2884531e119c5251478055f45158cfe622/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java#L295) the partitioning is not followed as for the Aggregation node (https://github.com/prestodb/presto/blob/054c7d2884531e119c5251478055f45158cfe622/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java#L251)
There was a similar issue for the Window functions resolved by the https://github.com/prestodb/presto/pull/12122
This prevents us from using the grouped execution efficiently.
CC: @wenleix @rschlussel @shixuan-fan
I recently found that queries like
user_id,
count(distinct item_id),
count(distinct page_id),
count(distinct ... ),
count(distinct ... ),
count(distinct ... ),
count(distinct ... )
FROM
...
GROUP BY
userId
Are very common.
Currently the high level plan for this query will be
[partition by user_id + item_id] -> [mark distinct user_id + item_id] -> [partition by user_id + page_id] -> [mark distinct user_id + page_id] -> ... -> [partition by user_id] -> [aggregate by user id]
It requires extra shuffle at every step, that is very expensive.
In theory, the input can be partitioned once by user_id, and then all the distincts and the aggregation itself can be executed in a single stage.
I think the concert @martint expressed:
There are many queries that fail or have worse performance due to lack of parallelism when use_mark_distinct is off. We canβt yet change the default.
does not apply for this very case, as at the final step the aggregation by the user_id must be done anyway. So, if there's a skew by user_id the whole processing will be skewed by the final aggregation anyway.
The proposal is to change the condition here:
to
if (child.getProperties().isSingleNode() ||
!child.getProperties().isStreamPartitionedOn(node.getDistinctSymbols()) &&
!child.getProperties().isNodePartitionedOn(preferredChildProperties.getGlobalProperties().get().getPartitioningProperties().get().getPartitioningColumns())) {
(of course with doing all the safety checks like preferredChildProperties.getGlobalProperties().isPresent() ...
The additional check effectively verifies that if the parent partitioning requirement is satisfied, no need to change the partitioning just for the mark distinct.
@wenleix , @rschlussel Thoughts?
So this is indeed an interesting problem. We will label the following query shape for convenience.
Query-Single:
SELECT custkey, COUNT(DISTINCT col)
FROM ...
GROUP BY custkey
Query-Many:
SELECT custkey,
COUNT(DISTINCT col1),
COUNT(DISTINCT col2),
...
COUNT(DISTINCT col100)
FROM ...
GROUP BY custkey
We will discuss over the two different scenarios (over unbucketed table vs. bucketed table)
When the underlying table is unbucketed and use_mark_distinct=true, MarkDistinct will request to add an exchange. In this case, it makes sense to add exchange over both custkey and col (thinking about the case for join and aggregate). Only partitioning on custkey has the potential risk on skew or insufficient different custkey values.
So, for Query-Many over unbucketed table, we will see 100 remote exchanges since all the added exchanges are not compatible to each other. This is a known to be very inefficient even for remote exchange, and this situation is even further exacerbated in the exchange materialization situation.
In such case, the discussion in this issue about not inserting add exchange won't help. We should change to use use_mark_distinct=false introduced in https://github.com/prestodb/presto/commit/78f81465486848daa3252a8aae2ffc9e7cb9ee29. One thought is to always to make user set use_mark_distinct=false if they want to try this new exchange materialization feature :)
When the input is already partitioned on custkey (e.g. the input table is bucketed), this now brings the question whether we still need to add the exchange to partition over (custkey, col). One could argue similar to the aggregation and window function (https://github.com/prestodb/presto/pull/12122) case, we should just adopt the existing partitioning.
On the other hand, one could also argue should we just set use_mark_distinct=false.
Adaptive execution? For example, we can always start the query with use_mark_distinct=false, and only switch to use use_mark_distinct=true when we found
I found in general this problem has many resemblance with join/aggregation skew and whether to insert additional exchange, and might want to be solved in the same framework.
Only partitioning on custkey has the potential risk on skew or insufficient different custkey values.
But anyhow, the latest GROUP BY custkey will have to shuffle by custkey. So, in case there's a skew - it will be skewed on the last stage anyway.
When the input is already partitioned on custkey (e.g. the input table is bucketed), this now brings the question whether we still need to add the exchange to partition over (custkey, col).
Again, if the downstream stage is an aggregation on custkey, i don't see why should we add extra exchange.
The only reason i can see - is the memory usage skew (some custkey's may have more distinct values for col)? Do we think that this is the concern?
After the offline discussion with @rschlussel, we came into conclusion that the shuffles mainly needed to take care of the memory skew if the aggregation column cardinality is low.
I think nothing has to be changed related to MarkDistinct, and instead use_mark_distinct=false has to be used if memory skew is not a problem.
@arhimondr / @wenleix - If we run multiple distinct queries on presto views which is made from multiple union all. Remote exchange stage is happening in single stage for all the distincts
use_mark_distinct[distinct=transactiondate:date, accountnumber:bigint marker=accountnumber$distinct][$hashvalue_210]
β Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint, $has
β CPU: 10.77s (10.17%), Scheduled: 13.59s (3.10%), Output: 9570301 rows (1.67GB)
β Input avg.: 74767.98 rows, Input std.dev.: 645.51%
ββ MarkDistinct[distinct=transactiondate:date, origintransactionid:varchar marker=origintransactionid$distinct][$hashvalue_211]
β Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint, $
β CPU: 12.56s (11.86%), Scheduled: 15.32s (3.49%), Output: 9570301 rows (1.66GB)
β Input avg.: 74767.98 rows, Input std.dev.: 645.51%
ββ MarkDistinct[distinct=transactiondate:date, servedmsisdn:bigint marker=servedmsisdn$distinct][$hashvalue_212]
β Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:bigint
β CPU: 11.04s (10.43%), Scheduled: 13.31s (3.03%), Output: 9570301 rows (1.64GB)
β Input avg.: 74767.98 rows, Input std.dev.: 645.51%
ββ MarkDistinct[distinct=transactiondate:date, filename:varchar marker=filename$distinct][$hashvalue_213]
β Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:big
β CPU: 1.97s (1.86%), Scheduled: 3.02s (0.69%), Output: 9570301 rows (1.62GB)
β Input avg.: 74767.98 rows, Input std.dev.: 645.51%
ββ LocalExchange[HASH][$hashvalue] ("transactiondate")
β Layout: [origintransactionid:varchar, servedmsisdn:bigint, accountnumber:bigint, transactionamount:double, accountbalance:double, filename:varchar, transactiondate:date, $hashvalue:bigint, $hashvalue_210:bigint, $hashvalue_211:
β Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
β CPU: 6.71s (6.34%), Scheduled: 11.00s (2.51%), Output: 9570301 rows (1.60GB)
β Input avg.: 37383.99 rows, Input std.dev.: 312.76%
ββ RemoteSource[3]
β Layout: [origintransactionid_5:varchar, servedmsisdn_6:bigint, accountnumber_7:bigint, transactionamount_13:double, filename_66:varchar, transactiondate_70:date, expr_73:double, $hashvalue_214:bigint, $hashvalue_215:bigint,
β CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 0 rows (0B)
β Input avg.: 0.00 rows, Input std.dev.: ?%
ββ RemoteSource[4]
Layout: [servedmsisdn_89:bigint, accountnumber_90:bigint, origintransactionid_101:varchar, transactionamount_106:double, accountbalance_108:double, filename_113:varchar, transactiondate_117:date, $hashvalue_229:bigint, $hash
CPU: 2.02s (1.90%), Scheduled: 4.58s (1.04%), Output: 9570301 rows (1.60GB)
Input avg.: 74767.98 rows, Input std.dev.: 209.55%
On tables we have multiple stages for mark distinct
Most helpful comment
CC: @wenleix @rschlussel @shixuan-fan