cloud-fan commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3448083454


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)
+            // A materialized filtering side (e.g. a LocalRelation) may carry 
no column statistics
+            // but an exact `maxRows`, which is a conservative upper bound on 
its join-key NDV. Use
+            // it when the column statistic is missing so a small, selective 
materialized side still
+            // yields a statistics-based ratio rather than falling through to 
the gated fallback.
+            val otherDistinctCount =
+              distinctCounts(rightAttr, 
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))

Review Comment:
   Thanks for the detailed repro. I believe this is the general DPP 
re-evaluation limitation rather than something this PR introduces. The 
filtering side here contains `filter(p > 0)`, a selective predicate, so it's 
DPP-eligible via the `hasSelectivePredicate` path independent of 
materialization. With the default `fallbackFilterRatio = 0.5` it already gets 
standalone DPP and produces the same wrong result on master today; the 
`maxRows` bound only changes behavior in the non-default `fallbackFilterRatio = 
0` setting.
   
   More fundamentally, Spark cannot decide a plan's repeatability in general 
(opaque RDD/closure non-determinism is invisible to Catalyst), so the 
selective-predicate path has exactly the same exposure. This is being discussed 
in #56636, which argues repeatability is a DPP-wide concern and removes the 
materialized-input-specific `isRepeatableMaterializedPlan` gate. I'd rather not 
re-introduce a repeatability check inside `pruningHasBenefit`, which is a 
benefit estimator, not a correctness gate. If we want to address non-repeatable 
re-evaluation it should be a uniform DPP-wide change, out of scope here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)

Review Comment:
   Good catch, and you're right that the lineage mismatch predates this PR -- 
`getFilterableTableScan` resolves the expression through projections/aliases 
but the NDV lookup pairs the original join key against the leaf's 
`AttributeMap`. This is a general DPP stats-estimation issue: it affects the 
selective-predicate path too, and it only ever costs a missed optimization, 
never correctness. This PR doesn't touch `getFilterableTableScan` or the NDV 
lookup, so it's not introduced here. I'd prefer to fix it as a separate change 
(carrying the resolved leaf attribute into the lookup) that benefits all of 
DPP, rather than expand the scope of this follow-up.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {
+        // filter out expressions with more than one attribute on any side of 
the operator
+        case (leftAttr :: Nil, rightAttr :: Nil)
+          if conf.dynamicPartitionPruningUseStats =>
+            // get the CBO stats for each attribute in the join condition
+            val partDistinctCount = distinctCounts(leftAttr, partPlan)
+            // A materialized filtering side (e.g. a LocalRelation) may carry 
no column statistics
+            // but an exact `maxRows`, which is a conservative upper bound on 
its join-key NDV. Use
+            // it when the column statistic is missing so a small, selective 
materialized side still
+            // yields a statistics-based ratio rather than falling through to 
the gated fallback.
+            val otherDistinctCount =
+              distinctCounts(rightAttr, 
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))
+            val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
+              otherDistinctCount.isDefined
+            if (!availableStats) {
+              None
+            } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
+              // there is likely an estimation error, so there is no reliable 
stats-based ratio
+              None
+            } else {
+              Some(1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble)

Review Comment:
   Agreed this over-estimates when the key is a transformation like `p % 2`, 
and as you note it predates this PR -- it's a pre-existing imprecision in 
`statsBasedRatio` that applies to the selective-predicate + fallback path as 
well. The effect is a no-benefit subquery (wasted work), not a wrong result. 
It's a general DPP benefit-estimation improvement (use the column NDV only when 
the resolved key is itself an `Attribute`, or use expression-level NDV) and is 
orthogonal to this PR's goal of not injecting no-benefit DPP for materialized 
sides, so I'd like to handle it separately together with the lineage fix above.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * in bytes of the plan on the other side of the join. We estimate the 
filtering ratio
    * using column statistics if they are available, otherwise we use the 
config value of
    * `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+   *
+   * The fallback ratio is only meaningful "when CBO stats are missing, but 
there is a predicate
+   * that is likely to be selective" -- so it is used only when 
`hasSelectivePredicate` is true. A
+   * filtering side that is eligible only because it is already materialized 
(a LocalRelation or a
+   * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate; 
for it we rely solely on
+   * the statistics-based ratio and report no benefit when statistics are 
unavailable, so it is not
+   * injected as a standalone always-applied subquery on a guessed ratio. A 
statistics-based ratio,
+   * when available, is always honored regardless of `hasSelectivePredicate`.
    */
   private def pruningHasBenefit(
       partExpr: Expression,
       partPlan: LogicalPlan,
       otherExpr: Expression,
-      otherPlan: LogicalPlan): Boolean = {
+      otherPlan: LogicalPlan,
+      hasSelectivePredicate: Boolean): Boolean = {
 
     // get the distinct counts of an attribute for a given table
     def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
       plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
     }
 
-    // the default filtering ratio when CBO stats are missing, but there is a
-    // predicate that is likely to be selective
-    val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
-    // the filtering ratio based on the type of the join condition and on the 
column statistics
-    val filterRatio = (partExpr.references.toList, 
otherExpr.references.toList) match {
-      // filter out expressions with more than one attribute on any side of 
the operator
-      case (leftAttr :: Nil, rightAttr :: Nil)
-        if conf.dynamicPartitionPruningUseStats =>
-          // get the CBO stats for each attribute in the join condition
-          val partDistinctCount = distinctCounts(leftAttr, partPlan)
-          val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
-          val availableStats = partDistinctCount.isDefined && 
partDistinctCount.get > 0 &&
-            otherDistinctCount.isDefined
-          if (!availableStats) {
-            fallbackRatio
-          } else if (partDistinctCount.get.toDouble <= 
otherDistinctCount.get.toDouble) {
-            // there is likely an estimation error, so we fallback
-            fallbackRatio
-          } else {
-            1 - otherDistinctCount.get.toDouble / 
partDistinctCount.get.toDouble
-          }
-      case _ => fallbackRatio
+    // the filtering ratio derived from column statistics, when reliable stats 
are available
+    val statsBasedRatio: Option[Double] =
+      (partExpr.references.toList, otherExpr.references.toList) match {

Review Comment:
   This is a reasonable coverage improvement -- `maxRows` does bound the NDV of 
any expression over those rows, so the single-reference requirement on the 
filtering key is stricter than necessary. But it expands DPP to cases that get 
none today, whereas this PR is narrowly about *removing* no-benefit DPP for 
materialized sides; it's a pre-existing limitation of the stats path, not 
something introduced here. I'd prefer to do it as a separate follow-up. (And 
the repeatability caveat is moot once #56636 removes 
`isRepeatableMaterializedPlan`.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to