cloud-fan commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3456998338


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)
+            // A materialized filtering side (e.g. a LocalRelation) may carry 
no column statistics
+            // but an exact `maxRows`, which is a conservative upper bound on 
its join-key NDV. Use
+            // it when the column statistic is missing so a small, selective 
materialized side still
+            // yields a statistics-based ratio rather than falling through to 
the gated fallback.
+            val otherDistinctCount =
+              distinctCounts(rightAttr, 
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))

Review Comment:
   Fixed in 896ace23. You're right that this is a PR-introduced regression in 
the `fallbackFilterRatio=0` configuration, distinct from the general 
hidden-non-determinism limitation: the `maxRows` bound I added in `6d2c13c` was 
applied to *any* filtering side, so an opaque plan (the stateful 
`mapPartitions`) that becomes DPP-eligible through its selective `filter(p>0)` 
could derive a benefit from the `limit(1)`-supplied `maxRows` and get a 
standalone, always-applied subquery -- re-evaluating the side and dropping the 
rows the join's second evaluation needs.
   
   I now gate the `maxRows` bound on 
`isCheaplyRecomputableMaterializedPlan(otherPlan)`, exactly as you suggested. A 
side that isn't cheaply recomputable (an opaque `mapPartitions`, or a `limit` 
above one) no longer contributes a `maxRows`-based ratio, so with no usable 
stats it reports no benefit and isn't injected standalone; the intended 
`LocalRelation` / checkpointed `LogicalRDD` cases are unaffected. This isn't a 
general repeatability gate in the benefit estimator -- `maxRows` is only a 
sound NDV bound for a materialized/bounded side in the first place, so aligning 
it with the eligibility predicate is where it belongs.
   
   Added a regression test (`mappedKeys.filter($"p" > 0).limit(1)` with 
`fallbackFilterRatio=0`): it asserts no standalone DPP and a correct result. I 
verified it fails on the prior revision with exactly your symptom (`ArraySeq() 
did not equal List([1])` -- the empty double-evaluation result) and passes now.



##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1880,33 +1992,20 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
           DppMaterializedInputTestState.reset(counterId)
           assert(df.collect().toSeq === Seq(Row(1)))
           assert(activeDppSubqueries(df).isEmpty,
-            s"Shouldn't trigger DPP for an opaque materialized plan:\n" +
+            s"Shouldn't execute standalone DPP for this filtering side:\n" +
               df.queryExecution)
         }
 
+        // A LocalRelation is cheaply recomputable and exposes an exact 
maxRows, a conservative
+        // bound on its join-key NDV, so its pruning benefit is estimable and 
it gets a standalone
+        // DPP subquery.
         checkStandaloneDpp(Seq(1).toDF("p"))

Review Comment:
   Good catch -- restored in 896ace23. You're right the merge resolution 
dropped the coverage for the cheap `Filter`/`Project` (eligible), `Aggregate` 
(excluded), and UDF-projection (excluded) branches of 
`isCheaplyRecomputableMaterializedPlan`. Those fixtures previously forced 
benefit through `fallbackFilterRatio=1000`, which this PR gates, so I couldn't 
keep them as standalone-DPP cases (a checkpointed `Seq(1)` has neither stats 
nor `maxRows`).
   
   Following your suggestion, I re-established them through **broadcast 
reuse**, which isolates the eligibility decision from the benefit term: a 
cheaply-recomputable `Filter`/`Project` over a checkpointed input reuses the 
join's broadcast for DPP, while the `Aggregate` and UDF-projection sides are 
not DPP-eligible even with a reusable broadcast present. So a regression that 
wrongly admits `Aggregate`/UDF (or drops the cheap `Filter`/`Project`) now 
fails CI again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to