EnricoMi opened a new pull request, #39131:
URL: https://github.com/apache/spark/pull/39131

   ### What changes were proposed in this pull request?
   Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an 
`Aggregate` when the join condition references an attribute that exists in its 
right plan and its left plan's child. This usually happens when the anti-join / 
semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those 
attributes (in this example due to the projection of `value` to `id`).
   
   This behaviour already exists for `Project` and `Union`, but `Aggregate` 
lacks this safety guard.
   
   ### Why are the changes needed?
   Without this change, the optimizer creates an incorrect plan.
   
   This example fails with `distinct()` (an aggregation), and succeeds without 
`distinct()`, but both queries are identical:
   ```scala
   val ids = Seq(1, 2, 3).toDF("id").distinct()
   val result = ids.withColumn("id", $"id" + 1).join(ids, "id", 
"left_anti").collect()
   assert(result.length == 1)
   ```
   With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition 
`(value#907 + 1) = value#907`, which can never be true. This effectively 
removes the anti-join.
   
   **Before this PR:**
   The anti-join is fully removed from the plan.
   ```
   == Physical Plan ==
   AdaptiveSparkPlan (16)
   +- == Final Plan ==
      LocalTableScan (1)
   
   (16) AdaptiveSparkPlan
   Output [1]: [id#900]
   Arguments: isFinalPlan=true
   ```
   
   This is caused by `PushDownLeftSemiAntiJoin` adding join condition 
`(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 
1) AS id#912` exists in the right child of the join as well as in the left 
grandchild:
   ```
   === Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
   !Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], 
[(id#910 + 1) AS id#912]
   !:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS 
id#910]
   !:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, 
((value#907 + 1) = value#907)
   !:     +- LocalRelation [value#907]                      :- LocalRelation 
[value#907]
   !+- Aggregate [id#910], [id#910]                         +- Aggregate 
[id#910], [id#910]
   !   +- Project [value#914 AS id#910]                        +- Project 
[value#914 AS id#910]
   !      +- LocalRelation [value#914]                            +- 
LocalRelation [value#914]
   ```
   
   The right child of the join and in the left grandchild would become the 
children of the pushed-down join, which creates an invalid join condition.
   
   **After this PR:**
   Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as 
both sides of the prospect join contain `id#910`. Hence, the join is not pushed 
down. The rule is then not applied any more.
   
   The final plan contains the anti-join:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan (24)
   +- == Final Plan ==
      * BroadcastHashJoin LeftSemi BuildRight (14)
      :- * HashAggregate (7)
      :  +- AQEShuffleRead (6)
      :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
      :        +- Exchange (4)
      :           +- * HashAggregate (3)
      :              +- * Project (2)
      :                 +- * LocalTableScan (1)
      +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, 
rowCount=3)
         +- BroadcastExchange (12)
            +- * HashAggregate (11)
               +- AQEShuffleRead (10)
                  +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, 
rowCount=3)
                     +- ReusedExchange (8)
   
   (8) ReusedExchange [Reuses operator id: 4]
   Output [1]: [id#898]
   
   (24) AdaptiveSparkPlan
   Output [1]: [id#900]
   Arguments: isFinalPlan=true
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   It fixes correctness.
   
   ### How was this patch tested?
   Unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to