cloud-fan commented on code in PR #56636:
URL: https://github.com/apache/spark/pull/56636#discussion_r3449028178


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,50 +205,37 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Returns whether a plan can be evaluated repeatedly from materialized 
inputs and produce the
-   * same rows.
-   *
-   * LocalRelation rows are already locally available. A checkpoint-derived 
LogicalRDD establishes
-   * an explicit checkpoint boundary and can be used as a broadcast build side 
for DPP without
-   * evaluating the computation upstream of that boundary again.
+   * Returns whether the plan contains an already-materialized input that can 
serve as a DPP build
+   * side without re-evaluating the computation that produced it: a 
LocalRelation (rows already
+   * locally available) or a checkpoint-derived LogicalRDD (an explicit, 
materialized checkpoint
+   * boundary -- `LogicalRDD.isCheckpointedInput` requires the RDD to be 
actually checkpointed, so a
+   * lazy checkpoint that has not been materialized yet does not qualify).
    *
    * InMemoryRelation is intentionally excluded because cache() and persist() 
are lazy: its
    * presence does not guarantee the cached data has been materialized, and 
missing or evicted
    * blocks may require evaluating the upstream computation again.
    *
-   * The supported operators are intentionally narrow. DPP is optional, and 
logical-plan
-   * determinism does not cover user functions stored outside Catalyst 
expressions.
+   * Note this does not constrain the operators above the materialized input. 
Whether re-evaluating
+   * those operators yields the same rows is the general DPP re-evaluation 
concern -- a
+   * non-deterministic operator (e.g. a `mapPartitions` closure or a UDF over 
a non-deterministic
+   * source) is not repeatable regardless of whether a leaf below it is 
materialized, and Spark
+   * cannot decide a plan's repeatability in general. That concern is not 
specific to materialized
+   * inputs (the selective-predicate path above has it too) and is 
intentionally not addressed here.
    */
-  private def isRepeatableMaterializedPlan(plan: LogicalPlan): Boolean = {
-    def isRepeatableExpression(expression: Expression): Boolean = {
-      expression.deterministic && !SubqueryExpression.hasSubquery(expression) 
&&
-        !expression.exists {
-          case _: NonSQLExpression | _: UserDefinedExpression | _: 
UserDefinedGenerator => true
-          case _ => false
-        }
-    }
-
-    plan match {
-      case _: LocalRelation => true
-      case r: LogicalRDD => r.isCheckpointedInput
-      case Project(projectList, child) if 
projectList.forall(isRepeatableExpression) =>
-        isRepeatableMaterializedPlan(child)
-      case Filter(condition, child) if isRepeatableExpression(condition) =>
-        isRepeatableMaterializedPlan(child)
-      case u: Union => u.children.forall(isRepeatableMaterializedPlan)
-      case SubqueryAlias(_, child) => isRepeatableMaterializedPlan(child)
-      case _ => false
-    }
+  private def hasMaterializedInput(plan: LogicalPlan): Boolean = plan.exists {

Review Comment:
   Thanks, you're right -- I'm not reverting to a leaf-anywhere `plan.exists` 
check. I've kept the structural check and re-justified it as a recompute-cost 
guard: a materialized leaf only makes a side eligible when the operators above 
it are scan-cost-bound (the only cost `calculatePlanOverhead` can see). Opaque 
RDD operators like `mapPartitions` remain excluded, so the silent row loss in 
your repro doesn't occur -- a single materialized descendant can't qualify an 
opaque parent. Updated the PR title/description to reflect this.



##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1801,140 +1797,6 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
     }
   }
 
-  test("DPP requires every leaf of a materialized filtering side to be 
materialized") {

Review Comment:
   Good point on locking in behavior -- I've kept both tests (this revision had 
removed them) rather than leaving the enabled case untested. On your specific 
`groupBy(...).agg(...)` example: under the cost framing that case stays 
*ineligible* by design, since an `Aggregate`'s shuffle/compute cost is 
invisible to `calculatePlanOverhead` and treating it as a cheap materialized 
input would overstate the benefit -- I added an explicit negative test 
documenting that. I also added a positive test that a cheap `filter`/`select` 
chain over a checkpoint does produce a standalone DPP subquery, to pin the 
enabled case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to