LuciferYang opened a new pull request, #55725:
URL: https://github.com/apache/spark/pull/55725

   ### What changes were proposed in this pull request?
   
   `UnionExec` is made to participate in whole-stage codegen on its standard 
(non-partitioning-aware) path, so the Union and all its children compile into 
one `WholeStageCodegenExec` stage instead of each child becoming its own stage.
   
   Main change — `UnionExec` implements `CodegenSupport`:
   
   - `supportCodegen` falls back when any of these hold: flag off, 
partitioning-aware output, a `UnionExec` anywhere in the subtree, a child 
subtree that contains a known multi-input-RDD codegen operator 
(`SortMergeJoinExec`, `ShuffledHashJoinExec`), a child subtree that contains 
any `Nondeterministic` expression, `children.size` above the configured cap, 
columnar output, or a projection that cannot codegen.
   - `doProduce` emits a `switch` on a per-partition child-index array and 
dispatches to a helper method per child. Each helper wraps the child's produced 
code and takes `int partitionIndex` as a parameter, which keeps code local to 
each helper (important because `ctx.addNewFunction` may spill helpers into a 
nested class).
   - `doConsume` projects each child's output to the Union's output schema 
(casts inserted where necessary) and forwards to the parent's `consume`.
   - `inputRDDs` returns a single `UnionRDD` of the children's input RDDs; the 
switch uses `UnionPartition.parentRddIndex` to decide which child's helper to 
call per task.
   - `needCopyResult` propagates up if any child needs it; `usedInputs = 
AttributeSet.empty` (projection happens in `doConsume`).
   
   Support infrastructure:
   
   - `CodegenContext.currentPartitionIndexVar` — indirection so that leaf 
operators that embed `partitionIndex` (e.g. `RangeExec.initRange`, 
`SampleExec`'s random-seed and skip-count) read a Union-provided child-local 
index when fused, and the literal `partitionIndex` otherwise.
   - Two new internal SQLConf entries:
     - `spark.sql.codegen.union.enabled` — default `false`. This is the initial 
implementation and we prefer to opt users in rather than flip a default they 
didn't ask for.
     - `spark.sql.codegen.union.maxChildren` — default `64`, enforced `>= 2` 
(since `EliminateUnions` removes single-child unions during analysis). Only 
effective when `spark.sql.codegen.union.enabled` is `true`.
   - `UnionExec.metrics` exposes `numOutputRows` only when fusion is active; 
when the flag is off, `UnionExec` has no metrics (matches pre-patch behavior).
   - `NUM_CHILDREN` `LogKey` for the fallback `logDebug`.
   
   ### Why are the changes needed?
   Before this change, a plan like `Union(t1, t2, t3)` always breaks the WSCG 
boundary: each child compiles into its own generated class and `UnionExec` 
executes interpreted. That produces N+1 generated classes per Union plus the 
overhead of stitching interpreted output back into WSCG for the parent. Query 
shapes that `UNION ALL` several codegen-friendly branches — partitioned 
fact-table unions in ETL, per-channel rollups, decorrelated sub-plans — pay 
this cost per invocation. Fusing into one stage compiles once and streams 
through a single `processNext`.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No. The feature is off by default, and both new configs are `.internal()`.
   
   
   ### How was this patch tested?
   - `UnionCodegenSuite` covers correctness under codegen, parity against the 
interpreted path via `assertFlagParity`, every fallback branch in 
`supportCodegenFailureReason`, projection pushdown, column pruning, metrics, 
large-N stress, and a regression guard against indirect nested unions (e.g. 
`Union(Project(Union,...), ...)`). `UnionCodegenAnsiSuite` and 
`UnionCodegenAqeSuite` (same file) re-run the full suite with ANSI and AQE 
enabled.
   - `SQLMetricsSuite` — the existing `SPARK-25278` regression test is 
unchanged (passes because `UnionExec.metrics` is empty when fusion is off). One 
new test verifies `UnionExec.numOutputRows` reports the total row count under 
fusion.
   - Existing Spark codegen and TPC-DS plan-stability suites run unchanged 
(this PR's `doExecute` behavior is identical to master when the flag is off; 
with the flag off, TPC-DS goldens are unchanged).
   - `UnionBenchmark` — added; 4 scenarios (plain, type widening, per-child 
ops, Union + downstream aggregate) covering N ∈ {2 .. 1024}. Results will be 
regenerated via the GHA benchmark workflow.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Generated-by: Claude Code
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to