yaooqinn opened a new pull request, #56091:
URL: https://github.com/apache/spark/pull/56091
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, see how to mark it:
https://spark.apache.org/contributing.html#pull-request
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline first
for naming conventions.
https://spark.apache.org/contributing.html#configurations-and-defaults
8. If you want to add or modify an error type or message, please read the
guideline first:
https://spark.apache.org/error-message-guidelines.html
-->
### What changes were proposed in this pull request?
Two complementary rules in `Optimizer.scala` that fold `InnerJoin` where one
side is a single-row `LocalRelation` or `OneRowRelation`:
**1. `ConvertToLocalRelation` case 5/6** (new symmetric arms in the existing
rule):
```scala
Join(LocalRelation(out, Seq(row), false, _), other, Inner, cond,
JoinHint.NONE)
if !cond.exists(hasUnevaluableExpr) && !other.isStreaming =>
foldSingleRowJoin(out, row, leftIsSingleRow = true, other, cond)
Join(other, LocalRelation(out, Seq(row), false, _), Inner, cond,
JoinHint.NONE)
if !cond.exists(hasUnevaluableExpr) && !other.isStreaming =>
foldSingleRowJoin(out, row, leftIsSingleRow = false, other, cond)
```
`foldSingleRowJoin` materializes the single row's typed values as `Literal`s
wrapped in `Alias` with the original `ExprId` preserved (so downstream
`Filter`/`Project` references stay valid), then emits `Project(literals ++
other.output, other)` (or reversed when single-row is on right), optionally
wrapped in `Filter(cond, ...)`.
**2. `FoldInnerJoinWithOneRowRelation`** (new independent rule, tree-pattern
`INNER_LIKE_JOIN`):
```scala
Join(OneRowRelation(), other, Inner, cond, JoinHint.NONE)
if !cond.exists(hasUnevaluableExpr) && !other.isStreaming =>
cond.map(Filter(_, other)).getOrElse(other)
// symmetric arm for OneRowRelation on right
```
`OneRowRelation` outputs **0 columns**, so the result is just `other` (no
`Project` needed), optionally filtered. Registered in `RuleIdCollection` and
placed in the `LocalRelation early` and `LocalRelation` batches.
#### Why two rules instead of one?
`OneRowRelation` does **not** publish the `LOCAL_RELATION` tree-pattern.
Folding both into a single rule that uses
`transformWithPruning(_.containsPattern(LOCAL_RELATION))` would silently miss
`OneRowRelation` subtrees. Two rules pin two distinct tree-patterns
(`LOCAL_RELATION` + `INNER_LIKE_JOIN`) for correct pruning.
### Why are these changes needed?
- Removes a redundant join node for a structurally common pattern
(auto-generated SQL from BI tools, view expansion where the view body collapses
to a single row post other-rule folding, user-written `WITH single AS (SELECT
1) SELECT ... FROM other CROSS JOIN single`).
- Fulfills the long-standing TODO in `DecorrelateInnerQuery.scala:435`: `//
TODO add a more general rule to optimize join with OneRowRelation`.
- Enables downstream rules (column pruning, constant folding, predicate
pushdown) to see through what was previously an opaque join.
### Does this PR introduce _any_ user-facing change?
No. Query results are bit-identical; only the optimized logical plan shape
changes (one less `Join`, one more `Project`/`Filter`). No new SQLConf knob —
the four narrowings make this strictly conservative.
### How was this patch tested?
#### Existing plan-stability suites (322 queries, 0 plan-diff)
| Suite | Tests | Result |
|---|---|---|
| `TPCDSV1_4_PlanStabilitySuite` | 99 | ✅ PASS |
| `TPCDSV1_4_PlanStabilityWithStatsSuite` | 99 | ✅ PASS |
| `TPCDSV2_7_PlanStabilitySuite` | 30 | ✅ PASS |
| `TPCDSV2_7_PlanStabilityWithStatsSuite` | 30 | ✅ PASS |
| `TPCDSModifiedPlanStabilitySuite` | 24 | ✅ PASS |
| `TPCDSModifiedPlanStabilityWithStatsSuite` | 24 | ✅ PASS |
| `TPCHPlanStabilitySuite` | 16 | ✅ PASS |
| **Total** | **322** | **✅ 0 plan-diff** |
#### New unit tests (19/19 PASS)
**`FoldInnerJoinWithOneRowRelationSuite`** (9 tests): OneRow × table
(left/right/no-cond), `LeftOuter` not folded (negative), `ArrayType` /
`MapType` / nested `StructType` columns preserved on the other side, nested
struct-of-array, condition with `Rand()` (Unevaluable) not folded.
**`ConvertToLocalRelationSuite`** (10 tests, T7-T10 new): `LocalRelation` ×
non-`LocalRelation` left/right symmetric, condition referencing both sides
folded into `Filter`, and **ExprId preservation** with strong assertion
(`collectFirst { case _: Join }.isEmpty && output.length == 4`).
#### Four narrowings (negative guards verified)
| Narrowing | ❌ Rejects |
|---|---|
| `JoinType` | `LeftOuter` / `RightOuter` / `FullOuter` / `LeftSemi` /
`LeftAnti` / `Cross` |
| `JoinHint` | `BROADCAST` / `SHUFFLE_HASH` / `SHUFFLE_MERGE` /
`SHUFFLE_REPLICATE_NL` |
| `LocalRelation.data.length` | `== 0` (already covered by
`PropagateEmptyRelation`) / `> 1` |
| condition / streaming | `Unevaluable` expressions / `isStreaming = true` |
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]