sunchao commented on code in PR #56636:
URL: https://github.com/apache/spark/pull/56636#discussion_r3450350049


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,38 +205,64 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
   }
 
   /**
-   * Returns whether a plan can be evaluated repeatedly from materialized 
inputs and produce the
-   * same rows.
+   * Returns whether the filtering side is cheap enough to recompute that DPP 
is worthwhile even
+   * without a selective predicate: its cost is dominated by an 
already-materialized input, with
+   * only scan-cost-bound operators above it.
    *
-   * LocalRelation rows are already locally available. A checkpoint-derived 
LogicalRDD establishes
-   * an explicit checkpoint boundary and can be used as a broadcast build side 
for DPP without
-   * evaluating the computation upstream of that boundary again.
+   * This is the cost-side counterpart to `hasSelectivePredicate`. A selective 
predicate is
+   * evidence of a high pruning ratio (the benefit term of 
`pruningHasBenefit`); an
+   * already-materialized input is the complementary signal on the cost term 
-- a `LocalRelation`
+   * (rows already local) or a checkpoint-derived `LogicalRDD` 
(`isCheckpointedInput` requires the
+   * RDD to be actually checkpointed, so a lazy checkpoint does not qualify) 
is ~free to re-read,
+   * so even a modest pruning ratio clears the benefit bar. `InMemoryRelation` 
is excluded because
+   * cache()/persist() are lazy: its presence does not guarantee the data has 
been materialized,
+   * and missing or evicted blocks may require recomputing the upstream plan.
    *
-   * InMemoryRelation is intentionally excluded because cache() and persist() 
are lazy: its
-   * presence does not guarantee the cached data has been materialized, and 
missing or evicted
-   * blocks may require evaluating the upstream computation again.
+   * The operators above the materialized input are restricted to ones whose 
cost is dominated by
+   * their input's scan bytes -- the only cost `calculatePlanOverhead` can 
see. `Project`/`Filter`
+   * add negligible compute, a `Union`'s cost is the sum of its (materialized) 
children, and
+   * `SubqueryAlias` is a no-op. `Aggregate`, joins, and opaque RDD operators 
(e.g. `mapPartitions`)
+   * are excluded: they add compute or a shuffle the scan-bytes cost model 
cannot see, so treating
+   * such a side as a cheap materialized input would overstate the pruning 
benefit. A `Project`/
+   * `Filter` is likewise excluded when its expressions embed a subquery 
(which carries its own
+   * plan) or an opaque user function (a UDF or a user-defined generator) -- 
both add recompute
+   * cost `calculatePlanOverhead` does not account for.
    *
-   * The supported operators are intentionally narrow. DPP is optional, and 
logical-plan
-   * determinism does not cover user functions stored outside Catalyst 
expressions.
+   * This is primarily a cost guard, but the eligible shapes are also 
repeatable in practice, which
+   * matters because DPP duplicates the filtering side and must produce the 
same keys on
+   * re-evaluation. Honest non-determinism does not slip through: a `rand()` 
(or a UDF marked
+   * non-deterministic) above the materialized input makes the resulting 
`DynamicPruningSubquery`
+   * non-deterministic (`PlanExpression.deterministic` folds in its build 
plan), so
+   * `CleanupDynamicPruningFilters` rewrites the dynamic predicate to `true` 
before physical
+   * planning rather than planning a standalone `SubqueryExec` -- it is never 
re-evaluated. The
+   * residual, DPP-wide limitation is *hidden* non-determinism left marked 
deterministic; the
+   * opaque-expression exclusion above narrows it, and the rest is 
intentionally left to a future
+   * system-level design rather than patched piecemeal here. The one 
materialized-input-specific
+   * repeatability concern -- a checkpoint that has not been materialized yet 
-- is handled by
+   * `LogicalRDD.isCheckpointedInput` requiring the RDD to be actually 
checkpointed.
    */
-  private def isRepeatableMaterializedPlan(plan: LogicalPlan): Boolean = {
-    def isRepeatableExpression(expression: Expression): Boolean = {
-      expression.deterministic && !SubqueryExpression.hasSubquery(expression) 
&&
-        !expression.exists {
-          case _: NonSQLExpression | _: UserDefinedExpression | _: 
UserDefinedGenerator => true
-          case _ => false
-        }
+  private def isCheaplyRecomputableMaterializedPlan(plan: LogicalPlan): 
Boolean = {
+    // An expression keeps the side cheap only if its cost is bounded by the 
input scan that
+    // `calculatePlanOverhead` measures. A subquery embeds its own plan, and 
an opaque user
+    // function (a Scala/Python UDF, a user-defined generator, or any other 
non-Catalyst
+    // expression) adds CPU/IO the scan-bytes cost model cannot see -- 
recomputing either would
+    // cost more than the materialized leaf suggests, so it disqualifies the 
side.
+    def isScanCostBoundExpression(e: Expression): Boolean = {
+      !SubqueryExpression.hasSubquery(e) && !e.exists {

Review Comment:
   [P1] Do not make DPP correctness depend on an excludable cleanup rule
   
   Dropping `e.deterministic` here admits Catalyst-visible nondeterminism that 
is not one of these opaque marker types. `CallMethodViaReflection`, for 
example, extends `Nondeterministic` but not `NonSQLExpression`, 
`UserDefinedExpression`, or `UserDefinedGenerator`. The new rationale relies on 
`CleanupDynamicPruningFilters` to remove the DPP predicate, but that rule is 
not in `SparkOptimizer.nonExcludableRules` and can be disabled through the 
documented `spark.sql.optimizer.excludedRules` configuration.
   
   I reproduced silent row loss on the exact base and head with an eager 
checkpoint and a counter-backed `reflect` projection. The fact side contains 
every key produced by the two evaluations; a merge-join target plus a sibling 
broadcast of the same keys uses the default broadcast-reuse-only DPP path. With 
cleanup excluded, base `75260f8` returns one target row and no DPP. Head 
`3571d121` reuses the sibling broadcast for pruning, re-evaluates the target 
join's build side to a different key, and returns zero rows.
   
   Please either retain the determinism check here or make 
`CleanupDynamicPruningFilters` non-excludable. Retaining the check does not 
remove any DPP that would survive normal optimization, because cleanup already 
discards these cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to