xumingming opened a new pull request, #56777:
URL: https://github.com/apache/spark/pull/56777

   
   ### What changes were proposed in this pull request?
   
   Adds a new SQL config spark.sql.execution.bypassPartialAggregation (default 
false). When set to true, planAggregateWithoutDistinct skips the pre-shuffle 
Partial-mode aggregation and runs a single Complete-mode aggregation after the 
shuffle instead. This can improve performance when group cardinality is high 
and the pre-shuffle reduction ratio is low.
   
   The bypass is suppressed when a session_window grouping key is present, 
since MergingSessionsExec must be inserted in the Partial+Merge+Final path to 
correctly merge overlapping sessions.
   
   The config has no effect on queries containing DISTINCT aggregate functions, 
where the partial aggregation phases are required for correctness and are 
always applied.
   
   ### Why are the changes needed?
   
   The standard two-phase aggregation plan (Partial → shuffle → Final) assumes 
that pre-shuffle partial aggregation meaningfully reduces data volume. This 
assumption breaks down in two scenarios.
   
   **Scenario 1: High group cardinality.** When group cardinality is high 
relative to partition size, every input row maps to a distinct key, so the 
partial aggregation produces one output row per input row and adds CPU and 
memory overhead with zero shuffle benefit.
   
   ```sql
   SELECT user_id, SUM(amount), COUNT(order_id), AVG(price)
   FROM orders
   GROUP BY user_id   – high-cardinality key: millions of distinct users
   ```
   
   On a table with 500M rows and 200M distinct user_id values, the pre-shuffle 
HashAggregateExec in Partial mode churns through the full dataset, spills when 
the hash map overflows, and still emits ~200M rows into the shuffle. The 
partial phase wastes wall-clock time and memory without reducing shuffle write 
volume.
   
   **Scenario 2: Skewed input data.** Even when partial aggregation can reduce 
data volume on average, skewed input partitions can make it harmful. If one 
partition contains a disproportionate share of rows for a small number of keys, 
the partial HashAggregateExec on that partition must hold a large hash map in 
memory, triggering spills. The skewed partition becomes the bottleneck and 
dominates wall-clock time — worse than if the data had been shuffled first and 
aggregated on already-partitioned, evenly distributed data.
   
   ```sql
   SELECT country_code, SUM(revenue)
   FROM orders
   GROUP BY country_code   – a few dominant countries hold 80% of rows
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   
   ### How was this patch tested?
   
   Added Unit Test.
   
   ### Was this patch authored or co-authored using generative AI tooling?
    
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to