xumingming opened a new pull request, #56777:
URL: https://github.com/apache/spark/pull/56777
### What changes were proposed in this pull request?
Adds a new SQL config spark.sql.execution.bypassPartialAggregation (default
false). When set to true, planAggregateWithoutDistinct skips the pre-shuffle
Partial-mode aggregation and runs a single Complete-mode aggregation after the
shuffle instead. This can improve performance when group cardinality is high
and the pre-shuffle reduction ratio is low.
The bypass is suppressed when a session_window grouping key is present,
since MergingSessionsExec must be inserted in the Partial+Merge+Final path to
correctly merge overlapping sessions.
The config has no effect on queries containing DISTINCT aggregate functions,
where the partial aggregation phases are required for correctness and are
always applied.
### Why are the changes needed?
The standard two-phase aggregation plan (Partial → shuffle → Final) assumes
that pre-shuffle partial aggregation meaningfully reduces data volume. This
assumption breaks down in two scenarios.
**Scenario 1: High group cardinality.** When group cardinality is high
relative to partition size, every input row maps to a distinct key, so the
partial aggregation produces one output row per input row and adds CPU and
memory overhead with zero shuffle benefit.
```sql
SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
FROM orders
GROUP BY user_id – high-cardinality key: millions of distinct users
```
On a table with 500M rows and 200M distinct user_id values, the pre-shuffle
HashAggregateExec in Partial mode churns through the full dataset, spills when
the hash map overflows, and still emits ~200M rows into the shuffle. The
partial phase wastes wall-clock time and memory without reducing shuffle write
volume.
**Scenario 2: Skewed input data.** Even when partial aggregation can reduce
data volume on average, skewed input partitions can make it harmful. If one
partition contains a disproportionate share of rows for a small number of keys,
the partial HashAggregateExec on that partition must hold a large hash map in
memory, triggering spills. The skewed partition becomes the bottleneck and
dominates wall-clock time — worse than if the data had been shuffled first and
aggregated on already-partitioned, evenly distributed data.
```sql
SELECT country_code, SUM(revenue)
FROM orders
GROUP BY country_code – a few dominant countries hold 80% of rows
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added Unit Test.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]