cloud-fan commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3457126300
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
Review Comment:
Agreed this is a real coverage gap -- `maxRows` does bound the NDV of any
expression over those rows, so the single-reference requirement on the
filtering key is stricter than necessary. But it's a pre-existing property of
the stats path, and relaxing it *expands* DPP to cases that get none today,
which is the opposite direction from this follow-up (whose goal is to stop
injecting no-benefit DPP for materialized sides). I'd rather not widen the
scope here; this feels like a separate, standalone improvement to the benefit
estimator. Happy to follow up on it separately.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
+ // filter out expressions with more than one attribute on any side of
the operator
+ case (leftAttr :: Nil, rightAttr :: Nil)
+ if conf.dynamicPartitionPruningUseStats =>
+ // get the CBO stats for each attribute in the join condition
+ val partDistinctCount = distinctCounts(leftAttr, partPlan)
Review Comment:
You're right, and thanks for confirming it predates this PR --
`getFilterableTableScan` resolves the expression through projections/aliases
but the NDV lookup still pairs the original join key against the leaf's
`AttributeMap`. It's a general DPP stats-estimation gap that only ever costs a
missed optimization, never correctness, and this PR doesn't touch that lookup.
I'd prefer to fix it on its own (carrying the resolved leaf attribute into the
lookup) so the improvement benefits all of DPP rather than expanding this
follow-up. Out of scope here, but a good separate change.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
+ // filter out expressions with more than one attribute on any side of
the operator
+ case (leftAttr :: Nil, rightAttr :: Nil)
+ if conf.dynamicPartitionPruningUseStats =>
+ // get the CBO stats for each attribute in the join condition
+ val partDistinctCount = distinctCounts(leftAttr, partPlan)
+ // A materialized filtering side (e.g. a LocalRelation) may carry
no column statistics
+ // but an exact `maxRows`, which is a conservative upper bound on
its join-key NDV. Use
+ // it when the column statistic is missing so a small, selective
materialized side still
+ // yields a statistics-based ratio rather than falling through to
the gated fallback.
+ val otherDistinctCount =
+ distinctCounts(rightAttr,
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))
+ val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
+ otherDistinctCount.isDefined
+ if (!availableStats) {
+ None
+ } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
+ // there is likely an estimation error, so there is no reliable
stats-based ratio
+ None
+ } else {
+ Some(1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble)
Review Comment:
Agreed it over-estimates when the key is a transformation like `p % 2`, and
as you note it predates this PR -- it's a pre-existing imprecision in
`statsBasedRatio` that the selective-predicate path shares. The effect is a
no-benefit subquery (wasted work), not a wrong result. It's really a general
benefit-estimation refinement (use the column NDV only when the resolved key is
itself an `Attribute`, or use expression-level NDV), orthogonal to this PR, so
I'd like to leave it out of here and handle it separately along with the
lineage lookup above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]