We currently don't support window functions in DistSQL queries. This means that any query which includes a window function will be forced into local SQL execution. This is a problem for OLAP-style workloads because these often include window functions when performing complex analytics.
We should fix this. The design of window function processing in DistSQL would need to be carefully thought out. To start, we would want to consider how they could be distributed and decomposed. To get an intuition about this, we can look at how window functions are implemented in local SQL. Each function begins with a scan over all input rows in order to compute partitions. This is similar to the partitioning step in a hash join, which DistSQL already knows how to do. After this, each partition can be operated on in isolation. The per-partition work can be decomposed into two steps - a sort and an aggregation. It would be ideal if these two steps could re-use existing distSQL processors. This should be possible but will require that aggregation processors be taught about window frames. Finally, we'll need to merge all of the partitions together. This again could rely on some existing processors like the UNION processor and the HASH JOIN processor (depending on how many columns in each row are propagated through the window function computation steps).
One complication of window functions is that a given query can have multiple window functions. This is similar to how a query can have multiple aggregation functions, but can get a lot more complicated because each window function can specify a different partitioning and sorting scheme. This means that we can't always combine window function aggregations. Instead, it's probably easiest to run each window function serially and not worry about multiple partitioning phases.
Boom!!!
Here are benchmarks on 4 node roachprod cluster with TPCH dataset (after adding memory accounting and fixing some inefficiencies during population of output rows). Performance improvements are 2x and more. The numbers are even higher if a query has several window functions with the same partitioning scheme.
Please note that on 1 node cluster, the performance of DistSQL window functions will be worse than of local execution which is expected (decrease in performance is usually around 20%).
Comparison on orders table (1500000 rows)
| Query | Local | DistSQL | Difference |
| --- | --- | --- | --- |
| SELECT count(*) OVER (PARTITION BY o_orderdate) FROM orders ORDER BY o_orderkey LIMIT 10; | 3.770270427s | 1.579288617s | -58.11% |
| SELECT sum(o_totalprice) OVER (PARTITION BY o_orderdate ORDER BY o_orderkey) FROM orders ORDER BY o_orderkey LIMIT 10; | 6.319257886s | 2.457607566s | -61.11% |
| SELECT avg(o_totalprice) OVER (PARTITION BY o_orderdate ORDER BY o_orderkey) FROM orders ORDER BY o_orderkey LIMIT 10; | 18.839226118s | 5.485205625s | -70.88% |
| SELECT avg(o_totalprice) OVER w, sum(o_custkey) OVER w FROM orders WINDOW w AS (PARTITION BY o_orderdate ORDER BY o_orderkey) ORDER BY o_orderkey LIMIT 10; | 22.03540549s | 6.03339383s | -72.62% |
| SELECT count(*) OVER w1, rank() OVER w2, avg(o_totalprice) OVER w1, sum(o_custkey) OVER w2, max(o_shippriority) OVER w2 FROM orders WINDOW w1 AS (PARTITION BY o_orderdate), w2 AS (PARTITION BY o_orderpriority ORDER BY o_orderkey) ORDER BY o_orderkey LIMIT 10; | 29.909022153s | 10.799945246s | -63.89% |
Comparison on lineitem table (6001215 rows)
| Query | Local | DistSQL | Difference |
| --- | --- | --- | --- |
| SELECT count(*) OVER (PARTITION BY l_shipdate) FROM lineitem ORDER BY l_orderkey LIMIT 10; | 15.852753941s | 8.356666513s | -47.29% |
| SELECT sum(l_quantity) OVER (PARTITION BY l_shipdate ORDER BY l_orderkey) FROM lineitem ORDER BY l_orderkey LIMIT 10; | 28.248049396s | 11.769740243s | -58.33% |
| SELECT avg(l_quantity) OVER (PARTITION BY l_shipdate ORDER BY l_orderkey) FROM lineitem ORDER BY l_orderkey LIMIT 10; | 77.418174762s | 23.919147975s | -69.10% |
| SELECT avg(l_quantity) OVER w, sum(l_linenumber) OVER w FROM lineitem WINDOW w AS (PARTITION BY l_receiptdate ORDER BY l_orderkey) ORDER BY l_orderkey LIMIT 10; | OOM error | 26.874768298s | -infinity% |
| SELECT row_number() OVER w1, sum(l_discount) OVER w2, last_value(l_comment) OVER w1, min(l_suppkey) OVER w2 FROM lineitem WINDOW w1 AS (PARTITION BY l_shipmode ORDER BY l_orderkey), w2 AS (PARTITION BY l_commitdate) ORDER BY l_orderkey LIMIT 10; | OOM error | 34.302523343s | -infinity% |
@yuzefovich is a stud. thank you so much for this.
Most helpful comment
Here are benchmarks on 4 node roachprod cluster with TPCH dataset (after adding memory accounting and fixing some inefficiencies during population of output rows). Performance improvements are 2x and more. The numbers are even higher if a query has several window functions with the same partitioning scheme.
Please note that on 1 node cluster, the performance of DistSQL window functions will be worse than of local execution which is expected (decrease in performance is usually around 20%).
Comparison on orders table (1500000 rows)
| Query | Local | DistSQL | Difference |
| --- | --- | --- | --- |
|
SELECT count(*) OVER (PARTITION BY o_orderdate) FROM orders ORDER BY o_orderkey LIMIT 10;| 3.770270427s | 1.579288617s | -58.11% ||
SELECT sum(o_totalprice) OVER (PARTITION BY o_orderdate ORDER BY o_orderkey) FROM orders ORDER BY o_orderkey LIMIT 10;| 6.319257886s | 2.457607566s | -61.11% ||
SELECT avg(o_totalprice) OVER (PARTITION BY o_orderdate ORDER BY o_orderkey) FROM orders ORDER BY o_orderkey LIMIT 10;| 18.839226118s | 5.485205625s | -70.88% ||
SELECT avg(o_totalprice) OVER w, sum(o_custkey) OVER w FROM orders WINDOW w AS (PARTITION BY o_orderdate ORDER BY o_orderkey) ORDER BY o_orderkey LIMIT 10;| 22.03540549s | 6.03339383s | -72.62% ||
SELECT count(*) OVER w1, rank() OVER w2, avg(o_totalprice) OVER w1, sum(o_custkey) OVER w2, max(o_shippriority) OVER w2 FROM orders WINDOW w1 AS (PARTITION BY o_orderdate), w2 AS (PARTITION BY o_orderpriority ORDER BY o_orderkey) ORDER BY o_orderkey LIMIT 10;| 29.909022153s | 10.799945246s | -63.89% |Comparison on lineitem table (6001215 rows)
| Query | Local | DistSQL | Difference |
| --- | --- | --- | --- |
|
SELECT count(*) OVER (PARTITION BY l_shipdate) FROM lineitem ORDER BY l_orderkey LIMIT 10;| 15.852753941s | 8.356666513s | -47.29% ||
SELECT sum(l_quantity) OVER (PARTITION BY l_shipdate ORDER BY l_orderkey) FROM lineitem ORDER BY l_orderkey LIMIT 10;| 28.248049396s | 11.769740243s | -58.33% ||
SELECT avg(l_quantity) OVER (PARTITION BY l_shipdate ORDER BY l_orderkey) FROM lineitem ORDER BY l_orderkey LIMIT 10;| 77.418174762s | 23.919147975s | -69.10% ||
SELECT avg(l_quantity) OVER w, sum(l_linenumber) OVER w FROM lineitem WINDOW w AS (PARTITION BY l_receiptdate ORDER BY l_orderkey) ORDER BY l_orderkey LIMIT 10;| OOM error | 26.874768298s | -infinity% ||
SELECT row_number() OVER w1, sum(l_discount) OVER w2, last_value(l_comment) OVER w1, min(l_suppkey) OVER w2 FROM lineitem WINDOW w1 AS (PARTITION BY l_shipmode ORDER BY l_orderkey), w2 AS (PARTITION BY l_commitdate) ORDER BY l_orderkey LIMIT 10;| OOM error | 34.302523343s | -infinity% |