sunchao commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3455505138


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)

Review Comment:
   [P2] Rechecked after the #56636 rebase against exact current base 
`acb7de23`: this regression is still present. For `p AS alias_p` with fact NDV 
20 and filtering-side NDV 1, the base uses standalone DPP and scans 1/20 
partitions, while head `7ed155cb` misses the leaf statistic and scans 20/20.
   
   Although the lineage mismatch itself predates this PR, gating the fallback 
turns that previously masked miss into the base-to-head performance regression. 
Could we carry the expression resolved by `findExpressionAndTrackLineageDown` 
into the leaf NDV lookup?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)
+            // A materialized filtering side (e.g. a LocalRelation) may carry 
no column statistics
+            // but an exact `maxRows`, which is a conservative upper bound on 
its join-key NDV. Use
+            // it when the column statistic is missing so a small, selective 
materialized side still
+            // yields a statistics-based ratio rather than falling through to 
the gated fallback.
+            val otherDistinctCount =
+              distinctCounts(rightAttr, 
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))
+            val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
+              otherDistinctCount.isDefined
+            if (!availableStats) {
+              None
+            } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
+              // there is likely an estimation error, so there is no reliable 
stats-based ratio
+              None
+            } else {
+              Some(1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble)

Review Comment:
   [P2] I reran this against current base `acb7de23` and head `7ed155cb` after 
the rebase. With all partition values even and the join key `p % 2`, the base 
at `fallbackFilterRatio=0` creates no standalone DPP; the head uses `NDV(p)=20` 
plus `maxRows=1`, creates a standalone DPP subquery, and still scans all 20 
partitions.
   
   So the imprecise expression-NDV lookup predates this PR, but the new 
row-bound branch newly activates the wasted subquery in this configuration. 
Please use the leaf column NDV only when the lineage-resolved partition key is 
itself an `Attribute`, unless expression-level NDV is available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to