The plan for the following query is missing partial aggregation making it so that aggregation happens very slowly on a single node. This looks like a regression caused by #10731
explain select sum(totalprice), orderstatus from orders where orderstatus='O' group by orderstatus;
- Output[_col0, orderstatus] => [sum:double, orderstatus:varchar(1)]
Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 220010.00}
_col0 := sum
- RemoteExchange[GATHER] => orderstatus:varchar(1), sum:double
Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 220010.00}
- Aggregate(STREAMING)[orderstatus] => [orderstatus:varchar(1), sum:double]
Cost: {rows: 1 (20B), cpu: 1026620.00, memory: 20.00, network: 219990.00}
sum := "sum"("totalprice")
- LocalExchange[HASH][$hashvalue] ("orderstatus") => orderstatus:varchar(1), totalprice:double, $hashvalue:bigint
Cost: {rows: 7333 (214.83kB), cpu: 806630.00, memory: 0.00, network: 219990.00}
- RemoteExchange[REPARTITION][$hashvalue_9] => orderstatus:varchar(1), totalprice:double, $hashvalue_9:bigint
Cost: {rows: 7333 (214.83kB), cpu: 586640.00, memory: 0.00, network: 219990.00}
- ScanProject[table = tpch:tpch:orders:sf0.01, originalConstraint = ("orderstatus" = 'O')] => [orderstatus:varchar(1), totalprice:double, $hashvalue_10:bigint]
Cost: {rows: 7333 (143.22kB), cpu: 146660.00, memory: 0.00, network: 0.00}/{rows: 7333 (214.83kB), cpu: 366650.00, memory: 0.00, network: 0.00}
$hashvalue_10 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("orderstatus"), 0))
orderstatus := tpch:orderstatus
:: [[O]]
totalprice := tpch:totalprice
This is because PushPartialAggregationThroughExchange rule doesn't apply to streaming aggregations.
CC @sopel39
I'm seeing that AddExchanges#visitAggregation inserts a RemoteExchange under the AggregationNode to satisfy partitioned_on(orderstatus) property:
- Aggregation (SINGLE, orderstatus)
- TableScan
becomes
- Aggregation (SINGLE, orderstatus)
- RemoteExchange (HASH, orderstatus)
- TableScan
Then AddLocalExchanges sets AggregationNode#preGroupedSymbols to {orderstatus} based on Constant(orderstatus) stream property and PushPartialAggregationThroughExchange is skipped.
Streaming aggregation over repartitioning of gather exchange implies that all grouping keys are constant. One option is to rewrite it as a global aggregation. That would be my preference, but I think global aggregations currently don't support constant columns. We'd have to wrap grouping keys in arbitrary(k). Another option is to extend PushPartialAggregationThroughExchange to streaming aggregations as follows:
- Aggregation (SINGLE, STREAMING)
- Exchange
becomes
- Aggregation (FINAL)
- Exchange
- Aggregation (PARTIAL, STREAMING)
We can drop streaming from the partial aggregation for simplicity.
@martint @sopel39 Thoughts?
Another option is to extend PushPartialAggregationThroughExchange to streaming aggregations as follows:
Maybe we could simply add partial streaming aggregation always, e.g: rewrite:
- Aggregation (SINGLE, STREAMING)
- Exchange
into
- Aggregation (FINAL, STREAMING)
- Exchange
- Aggregation (PARTIAL, STREAMING)
Is there a case when such rewrite is invalid? As you noted streaming aggregation above exchange implies constant pre-grouped symbols.
Alternatively we could rewrite to:
- Aggregation (FINAL, STREAMING)
- Exchange
- Aggregation (PARTIAL, HASH)
One option is to rewrite it as a global aggregation
Global aggregation produce default value, which we don't want here.
@sopel39
Global aggregation produce default value, which we don't want here.
That's right. Thanks for pointing it out.
Maybe we could simply add partial streaming aggregation always
That should work. It should be a pretty simple change.
@sopel39 @martint Martin and I discussed this some more. We believe streaming aggregation above exchange implies constant pre-grouped symbols assertion is true today, but may change later, at which point we may have a regression. To properly make a decision about whether a streaming aggregation can be split over remote exchange, we need the rule to know the properties of the data, e.g. ideally we need traits. For now, the proposal is to update AddLocalExchanges to populate AggregatonNode#preGroupedKeys only if they are not constant, e.g. if we expect multiple groups.
We believe streaming aggregation above exchange implies constant pre-grouped symbols assertion is true today, but may change later, at which point we may have a regression.
Still, I think that sources of GATHER/REPARTITION exchange needs to be grouped by the same symbols as the output of exchange. What would be the case where this doesn't apply?
In any case, we would catch a regression via: ValidateStreamingAggregations.
To properly make a decision about whether a streaming aggregation can be split over remote exchange, we need the rule to know the properties of the data, e.g. ideally we need traits
Traits would be a useful abstraction, but not required. In ValidateStreamingAggregations you directly derive properties for instance. In this case I'm not sure if we need to check properties of exchange source since we can imply it's grouped by the same symbols as exchange output.
For now, the proposal is to update AddLocalExchanges to populate AggregatonNode#preGroupedKeys only if they are not constant, e.g. if we expect multiple groups.
That would add slower hash exchange when grouping symbols are constant. Do we want that?
@sopel @martin
Karol, thanks for sharing your thoughts. My thinking on this evolved a bit since yesterday.
PushPartialAggregationThroughExchange aggregation rule works with an aggregation over exchange:
- Aggregation (SINGLE, STREAMING)
- Exchange (REMOTE)
The question here is whether streaming aggregation can be split over exchange.
I believe it is always safe to drop streaming and split the aggregation:
- Aggregation (FINAL)
- Exchange (REMOTE)
- Aggregation (PARTIAL)
This does "add slower hash exchange when grouping symbols are constant" and we'd rather avoid it.
It is also safe to not split the aggregation, but the loss of performance in this case is not acceptable.
To keep streaming aggregation for final, partial or both we need to either derive properties or consider all possible cases.
It is a given that exchange output is pre-grouped. There are two possibilities: (1) exchange input is pre-grouped and exchange preserve that grouping; (2) exchange itself enforces grouping (e.g. merging exchange enforces order).
In case of (1) it is safe to split the aggregation into streaming partial and streaming final. In case of (2) we can't safely add streaming partial aggregation, hence, we'll add hash aggregation. Since we don't really know how exchange enforces grouping and whether it relies of some properties of the input stream to do that, we can't assume that it will be able to enforce grouping on output of partial hash aggregation. Therefore, we can't assume that exchange output will be grouped and can't safely use streaming final aggregation. To summarize, in case of (2) it is only safe to drop streaming when splitting.
Without deriving properties, we can't know whether we have (1) or (2), hence, we have to drop streaming.
That said, I prefer this change over updating AddLocalExchange to not populate AggregationNode#preGroupedKeys when pre-grouped keys are constant. It seems cleaner to keep the change in the same rule whose limitation (lack of properties) requires it.
At the same time, I think it would be valuable to avoid "slower hash exchange when grouping symbols are constant". I see a couple of options to achieve that:
Change PushPartialAggregationThroughExchange to derive properties (similar to ValidateStreamingAggregations and some other rules) and add logic to preserve streaming if exchange input is pre-grouped.
Convince ourselves that in practice, we won't have case (2) above and rely on ValidateStreamingAggregations runtime check to catch any regressions.
My preference would be the following:
(1) submit a PR to always drop streaming when splitting to fix the regression in the upcoming release;
(2) submit a follow-up PR to derive properties in PushPartialAggregationThroughExchange and keep streaming if properties allow.
Martin, Karol, what do you think?
That seems reasonable, but it "(2) submit a follow-up PR to derive properties" may be harder than it seems or introduce things we want to move away from. The rule-based optimizer is not set up to make it easy for rules to arbitrarily walk the plan tree (to derive properties), which is why I've mentioned we might need to have support for traits and first-class properties support in the rule-based optimization framework before we can do that.
As to "slower hash exchange when grouping symbols are constant", we might want to measure the actual impact. It may turn out to be small enough that adding short-term hacks or unwanted complexity is not warranted.
Just to note:
In case of (2) we can't safely add streaming partial aggregation, hence, we'll add hash aggregation
(2) exchange itself enforces grouping (e.g. merging exchange enforces order).
Merging exchange does preserve grouping property as sources themselves are ordered