cloud-fan commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3457210968
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
+ // filter out expressions with more than one attribute on any side of
the operator
+ case (leftAttr :: Nil, rightAttr :: Nil)
+ if conf.dynamicPartitionPruningUseStats =>
+ // get the CBO stats for each attribute in the join condition
+ val partDistinctCount = distinctCounts(leftAttr, partPlan)
Review Comment:
Thanks for re-verifying. I'd still keep this out of the follow-up. This is a
missed optimization, not a slowdown or a correctness issue, and it's within the
same standard DPP holds today: the lineage miss is total and pre-existing --
`statsBasedRatio` never resolves an aliased partition key, on master or here,
and the selective-predicate path hits the identical miss. In your repro the
base's standalone DPP comes from the ungated fallback *guess*, not from stats;
gating that guess for a predicate-less materialized side is exactly this PR's
intent, and the materialized side ends up treated the same as any other side
with no usable stats and no selective predicate.
Carrying the expression resolved by `findExpressionAndTrackLineageDown` into
the leaf NDV lookup is a real improvement, but it benefits all of DPP and
touches `getFilterableTableScan`, which this PR doesn't. It should be fixed
separately.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
+ // filter out expressions with more than one attribute on any side of
the operator
+ case (leftAttr :: Nil, rightAttr :: Nil)
+ if conf.dynamicPartitionPruningUseStats =>
+ // get the CBO stats for each attribute in the join condition
+ val partDistinctCount = distinctCounts(leftAttr, partPlan)
+ // A materialized filtering side (e.g. a LocalRelation) may carry
no column statistics
+ // but an exact `maxRows`, which is a conservative upper bound on
its join-key NDV. Use
+ // it when the column statistic is missing so a small, selective
materialized side still
+ // yields a statistics-based ratio rather than falling through to
the gated fallback.
+ val otherDistinctCount =
+ distinctCounts(rightAttr,
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))
+ val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
+ otherDistinctCount.isDefined
+ if (!availableStats) {
+ None
+ } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
+ // there is likely an estimation error, so there is no reliable
stats-based ratio
+ None
+ } else {
+ Some(1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble)
Review Comment:
Thanks, agreed it over-estimates for a transformed key like `p % 2` -- but
this produces a wasted no-benefit subquery, not a wrong result, and it's the
same imprecision DPP already has today: `statsBasedRatio` uses `NDV(p)` for `p
% 2` for *any* relation that carries an NDV stat, so this wasted subquery
exists on master for a stats-bearing side joined on a transformed key. The
`maxRows` bound this PR adds is sound (it bounds the filtering side's NDV); the
over-estimate is entirely on the partition side and predates the PR. `maxRows`
only lets a no-stats materialized side reach the same estimate every
stats-bearing relation already gets, so the feature is consistent with DPP
today rather than worse -- and because the `maxRows` path is gated on
`isCheaplyRecomputableMaterializedPlan`, even a mis-estimated subquery
re-executes only a cheap side.
The partition-side fix (use the leaf column NDV only when the resolved key
is itself an `Attribute`, or use an expression-level NDV) is a general
`statsBasedRatio` improvement that applies to the selective-predicate path too.
It should be fixed separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]