sunchao commented on code in PR #56636:
URL: https://github.com/apache/spark/pull/56636#discussion_r3448758712


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,50 +205,37 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Returns whether a plan can be evaluated repeatedly from materialized 
inputs and produce the
-   * same rows.
-   *
-   * LocalRelation rows are already locally available. A checkpoint-derived 
LogicalRDD establishes
-   * an explicit checkpoint boundary and can be used as a broadcast build side 
for DPP without
-   * evaluating the computation upstream of that boundary again.
+   * Returns whether the plan contains an already-materialized input that can 
serve as a DPP build
+   * side without re-evaluating the computation that produced it: a 
LocalRelation (rows already
+   * locally available) or a checkpoint-derived LogicalRDD (an explicit, 
materialized checkpoint
+   * boundary -- `LogicalRDD.isCheckpointedInput` requires the RDD to be 
actually checkpointed, so a
+   * lazy checkpoint that has not been materialized yet does not qualify).
    *
    * InMemoryRelation is intentionally excluded because cache() and persist() 
are lazy: its
    * presence does not guarantee the cached data has been materialized, and 
missing or evicted
    * blocks may require evaluating the upstream computation again.
    *
-   * The supported operators are intentionally narrow. DPP is optional, and 
logical-plan
-   * determinism does not cover user functions stored outside Catalyst 
expressions.
+   * Note this does not constrain the operators above the materialized input. 
Whether re-evaluating
+   * those operators yields the same rows is the general DPP re-evaluation 
concern -- a
+   * non-deterministic operator (e.g. a `mapPartitions` closure or a UDF over 
a non-deterministic
+   * source) is not repeatable regardless of whether a leaf below it is 
materialized, and Spark
+   * cannot decide a plan's repeatability in general. That concern is not 
specific to materialized
+   * inputs (the selective-predicate path above has it too) and is 
intentionally not addressed here.
    */
-  private def isRepeatableMaterializedPlan(plan: LogicalPlan): Boolean = {
-    def isRepeatableExpression(expression: Expression): Boolean = {
-      expression.deterministic && !SubqueryExpression.hasSubquery(expression) 
&&
-        !expression.exists {
-          case _: NonSQLExpression | _: UserDefinedExpression | _: 
UserDefinedGenerator => true
-          case _ => false
-        }
-    }
-
-    plan match {
-      case _: LocalRelation => true
-      case r: LogicalRDD => r.isCheckpointedInput
-      case Project(projectList, child) if 
projectList.forall(isRepeatableExpression) =>
-        isRepeatableMaterializedPlan(child)
-      case Filter(condition, child) if isRepeatableExpression(condition) =>
-        isRepeatableMaterializedPlan(child)
-      case u: Union => u.children.forall(isRepeatableMaterializedPlan)
-      case SubqueryAlias(_, child) => isRepeatableMaterializedPlan(child)
-      case _ => false
-    }
+  private def hasMaterializedInput(plan: LogicalPlan): Boolean = plan.exists {

Review Comment:
   [P1] Do not use one materialized descendant to qualify an opaque filtering 
plan
   
   `plan.exists` makes the whole filtering side eligible as soon as it finds a 
checkpointed `LogicalRDD`, even when an opaque `MapPartitions` above that 
boundary can return different keys on separate executions.
   
   I reproduced silent row loss on the exact base and head without relying on 
mutable shared state. For example:
   
   ```scala
   val keys = Seq(0).toDS.localCheckpoint(eager = true).mapPartitions { values 
=>
     val p = Math.floorMod(TaskContext.get().stageId(), 20)
     values.map(_ => p)
   }
   ```
   
   The fact table has one row in every partition `p = 0..19`. The target uses a 
merge join, while an unrelated sibling broadcasts the same `keys` but joins 
against `p = -1`. Every valid evaluation must return exactly one target row: 
whichever key is generated exists in the fact table, and the sibling cannot 
match anything.
   
   With the default DPP, AQE, broadcast-reuse-only, and exchange-reuse settings 
(plus legal join hints), base `75260f8` returns one row with no DPP. This head 
`7d08951` returned no rows in two runs: DPP reused the sibling broadcast to 
prune the target, then the target join evaluated `MapPartitions` in a different 
stage and produced a different key. This query has no selective predicate, so 
the older selective-predicate path does not make this regression pre-existing.
   
   Please retain conservative eligibility for opaque operators, or otherwise 
ensure pruning uses the same evaluation as the target join. Additional plan 
shapes should only be enabled when their repeatability is established.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to