sunchao commented on code in PR #56636:
URL: https://github.com/apache/spark/pull/56636#discussion_r3449401725


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,38 +205,47 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Returns whether a plan can be evaluated repeatedly from materialized 
inputs and produce the
-   * same rows.
+   * Returns whether the filtering side is cheap enough to recompute that DPP 
is worthwhile even
+   * without a selective predicate: its cost is dominated by an 
already-materialized input, with
+   * only scan-cost-bound operators above it.
    *
-   * LocalRelation rows are already locally available. A checkpoint-derived 
LogicalRDD establishes
-   * an explicit checkpoint boundary and can be used as a broadcast build side 
for DPP without
-   * evaluating the computation upstream of that boundary again.
+   * This is the cost-side counterpart to `hasSelectivePredicate`. A selective 
predicate is
+   * evidence of a high pruning ratio (the benefit term of 
`pruningHasBenefit`); an
+   * already-materialized input is the complementary signal on the cost term 
-- a `LocalRelation`
+   * (rows already local) or a checkpoint-derived `LogicalRDD` 
(`isCheckpointedInput` requires the
+   * RDD to be actually checkpointed, so a lazy checkpoint does not qualify) 
is ~free to re-read,
+   * so even a modest pruning ratio clears the benefit bar. `InMemoryRelation` 
is excluded because
+   * cache()/persist() are lazy: its presence does not guarantee the data has 
been materialized,
+   * and missing or evicted blocks may require recomputing the upstream plan.
    *
-   * InMemoryRelation is intentionally excluded because cache() and persist() 
are lazy: its
-   * presence does not guarantee the cached data has been materialized, and 
missing or evicted
-   * blocks may require evaluating the upstream computation again.
+   * The operators above the materialized input are restricted to ones whose 
cost is dominated by
+   * their input's scan bytes -- the only cost `calculatePlanOverhead` can 
see. `Project`/`Filter`
+   * add negligible compute, a `Union`'s cost is the sum of its (materialized) 
children, and
+   * `SubqueryAlias` is a no-op. `Aggregate`, joins, and opaque RDD operators 
(e.g. `mapPartitions`)
+   * are excluded: they add compute or a shuffle the scan-bytes cost model 
cannot see, so treating
+   * such a side as a cheap materialized input would overstate the pruning 
benefit. A subquery in a
+   * `Project`/`Filter` is excluded for the same reason -- it embeds its own 
plan, whose recompute
+   * cost `calculatePlanOverhead` does not account for.
    *
-   * The supported operators are intentionally narrow. DPP is optional, and 
logical-plan
-   * determinism does not cover user functions stored outside Catalyst 
expressions.
+   * This is a cost guard only; it does not check that re-evaluating the side 
yields the same rows.
+   * DPP duplicates the filtering side on every eligibility path and has 
always assumed it is
+   * repeatable, so a non-deterministic operator above the materialized input 
(e.g. a `rand()`
+   * projection) can still produce different keys on re-evaluation -- the same 
pre-existing,
+   * DPP-wide limitation the selective-predicate path carries, intentionally 
left to a future
+   * system-level design rather than patched piecemeal here. The one 
materialized-input-specific
+   * repeatability concern -- a checkpoint that has not been materialized yet 
-- is handled by
+   * `LogicalRDD.isCheckpointedInput` requiring the RDD to be actually 
checkpointed.
    */
-  private def isRepeatableMaterializedPlan(plan: LogicalPlan): Boolean = {
-    def isRepeatableExpression(expression: Expression): Boolean = {
-      expression.deterministic && !SubqueryExpression.hasSubquery(expression) 
&&
-        !expression.exists {
-          case _: NonSQLExpression | _: UserDefinedExpression | _: 
UserDefinedGenerator => true
-          case _ => false
-        }
-    }
-
+  private def isCheaplyRecomputableMaterializedPlan(plan: LogicalPlan): 
Boolean = {
     plan match {
       case _: LocalRelation => true
       case r: LogicalRDD => r.isCheckpointedInput
-      case Project(projectList, child) if 
projectList.forall(isRepeatableExpression) =>
-        isRepeatableMaterializedPlan(child)
-      case Filter(condition, child) if isRepeatableExpression(condition) =>
-        isRepeatableMaterializedPlan(child)
-      case u: Union => u.children.forall(isRepeatableMaterializedPlan)
-      case SubqueryAlias(_, child) => isRepeatableMaterializedPlan(child)
+      case Project(projectList, child) if 
!projectList.exists(SubqueryExpression.hasSubquery) =>

Review Comment:
   [P2] Keep opaque UDF work out of the cheap path
   
   This only checks for subqueries, so a `Project` containing an arbitrary 
Scala/Python UDF is treated as cheaply recomputable even though 
`calculatePlanOverhead` sees none of its CPU or I/O cost. With 
`reuseBroadcastOnly=false` and no reusable exchange, standalone DPP evaluates 
the filtering plan once to build pruning keys, and the join evaluates it again.
   
   For example, with `keys = checkpoint.select(expensiveUdf($"raw").as("p"))`, 
I measured the UDF twice at this head versus once with the parent guard; the 
parent produces no standalone DPP. Please retain the `NonSQLExpression` / 
`UserDefinedExpression` / `UserDefinedGenerator` exclusion as a cost guard 
(without necessarily restoring the determinism check), or otherwise account for 
opaque expression cost.



##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1880,13 +1880,24 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
           DppMaterializedInputTestState.reset(counterId)
           assert(df.collect().toSeq === Seq(Row(1)))
           assert(activeDppSubqueries(df).isEmpty,
-            s"Shouldn't trigger DPP for a non-repeatable materialized plan:\n" 
+
+            s"Shouldn't trigger DPP for an opaque materialized plan:\n" +
               df.queryExecution)
         }
 
         checkStandaloneDpp(Seq(1).toDF("p"))
         checkStandaloneDpp(Seq(1).toDF("p").localCheckpoint(eager = true))
 
+        // Cheap, scan-cost-bound operators above a materialized input stay 
eligible: their
+        // recompute cost is dominated by the materialized leaf that 
calculatePlanOverhead sees.
+        checkStandaloneDpp(
+          Seq(1).toDF("p").localCheckpoint(eager = true).filter($"p" > 
0).select($"p"))

Review Comment:
   [P3] Make this new filter case exercise materialized-input eligibility
   
   `p > 0` is a `BinaryComparison`, so `isLikelySelective` returns true. 
`hasPartitionPruningFilter` therefore succeeds through `hasSelectivePredicate` 
without consulting `isCheaplyRecomputableMaterializedPlan`. I verified that 
this isolated assertion still produces standalone DPP with both the `Project` 
and `Filter` cases in that helper disabled.
   
   The earlier assertions do cover `Project` recursion, so the gap is 
specifically this new `Filter` / materialized-cost case. Please use a retained 
condition that is not classified as selective, or directly assert the intended 
helper path.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,38 +205,47 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Returns whether a plan can be evaluated repeatedly from materialized 
inputs and produce the
-   * same rows.
+   * Returns whether the filtering side is cheap enough to recompute that DPP 
is worthwhile even
+   * without a selective predicate: its cost is dominated by an 
already-materialized input, with
+   * only scan-cost-bound operators above it.
    *
-   * LocalRelation rows are already locally available. A checkpoint-derived 
LogicalRDD establishes
-   * an explicit checkpoint boundary and can be used as a broadcast build side 
for DPP without
-   * evaluating the computation upstream of that boundary again.
+   * This is the cost-side counterpart to `hasSelectivePredicate`. A selective 
predicate is
+   * evidence of a high pruning ratio (the benefit term of 
`pruningHasBenefit`); an
+   * already-materialized input is the complementary signal on the cost term 
-- a `LocalRelation`
+   * (rows already local) or a checkpoint-derived `LogicalRDD` 
(`isCheckpointedInput` requires the
+   * RDD to be actually checkpointed, so a lazy checkpoint does not qualify) 
is ~free to re-read,
+   * so even a modest pruning ratio clears the benefit bar. `InMemoryRelation` 
is excluded because
+   * cache()/persist() are lazy: its presence does not guarantee the data has 
been materialized,
+   * and missing or evicted blocks may require recomputing the upstream plan.
    *
-   * InMemoryRelation is intentionally excluded because cache() and persist() 
are lazy: its
-   * presence does not guarantee the cached data has been materialized, and 
missing or evicted
-   * blocks may require evaluating the upstream computation again.
+   * The operators above the materialized input are restricted to ones whose 
cost is dominated by
+   * their input's scan bytes -- the only cost `calculatePlanOverhead` can 
see. `Project`/`Filter`
+   * add negligible compute, a `Union`'s cost is the sum of its (materialized) 
children, and
+   * `SubqueryAlias` is a no-op. `Aggregate`, joins, and opaque RDD operators 
(e.g. `mapPartitions`)
+   * are excluded: they add compute or a shuffle the scan-bytes cost model 
cannot see, so treating
+   * such a side as a cheap materialized input would overstate the pruning 
benefit. A subquery in a
+   * `Project`/`Filter` is excluded for the same reason -- it embeds its own 
plan, whose recompute
+   * cost `calculatePlanOverhead` does not account for.
    *
-   * The supported operators are intentionally narrow. DPP is optional, and 
logical-plan
-   * determinism does not cover user functions stored outside Catalyst 
expressions.
+   * This is a cost guard only; it does not check that re-evaluating the side 
yields the same rows.
+   * DPP duplicates the filtering side on every eligibility path and has 
always assumed it is

Review Comment:
   [P3] Correct the nondeterministic-DPP rationale
   
   The `rand()` example in this paragraph does not reach physical DPP under the 
normal optimizer flow. `DynamicPruningSubquery.deterministic` includes its 
build query; `rand()` makes it nondeterministic, so 
`CleanupDynamicPruningFilters` cannot match 
`NodeWithOnlyDeterministicProjectAndFilter` and replaces the DPP predicate with 
true before physical planning. A reusable broadcast also does not duplicate the 
build side; only the standalone, non-reused branch plans a `SubqueryExec`.
   
   I verified this with `rand()`, a UDF explicitly marked nondeterministic, 
broadcast reuse, and AQE. It would be more accurate to describe hidden 
nondeterminism left marked deterministic as the remaining limitation, rather 
than say every eligibility path re-evaluates `rand()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to