cloud-fan commented on code in PR #56603:
URL: https://github.com/apache/spark/pull/56603#discussion_r3456998338
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -134,45 +134,63 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* in bytes of the plan on the other side of the join. We estimate the
filtering ratio
* using column statistics if they are available, otherwise we use the
config value of
* `spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio`.
+ *
+ * The fallback ratio is only meaningful "when CBO stats are missing, but
there is a predicate
+ * that is likely to be selective" -- so it is used only when
`hasSelectivePredicate` is true. A
+ * filtering side that is eligible only because it is already materialized
(a LocalRelation or a
+ * checkpoint-derived LogicalRDD, SPARK-54593) carries no such predicate;
for it we rely solely on
+ * the statistics-based ratio and report no benefit when statistics are
unavailable, so it is not
+ * injected as a standalone always-applied subquery on a guessed ratio. A
statistics-based ratio,
+ * when available, is always honored regardless of `hasSelectivePredicate`.
*/
private def pruningHasBenefit(
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
- otherPlan: LogicalPlan): Boolean = {
+ otherPlan: LogicalPlan,
+ hasSelectivePredicate: Boolean): Boolean = {
// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
plan.stats.attributeStats.get(attr).flatMap(_.distinctCount)
}
- // the default filtering ratio when CBO stats are missing, but there is a
- // predicate that is likely to be selective
- val fallbackRatio = conf.dynamicPartitionPruningFallbackFilterRatio
- // the filtering ratio based on the type of the join condition and on the
column statistics
- val filterRatio = (partExpr.references.toList,
otherExpr.references.toList) match {
- // filter out expressions with more than one attribute on any side of
the operator
- case (leftAttr :: Nil, rightAttr :: Nil)
- if conf.dynamicPartitionPruningUseStats =>
- // get the CBO stats for each attribute in the join condition
- val partDistinctCount = distinctCounts(leftAttr, partPlan)
- val otherDistinctCount = distinctCounts(rightAttr, otherPlan)
- val availableStats = partDistinctCount.isDefined &&
partDistinctCount.get > 0 &&
- otherDistinctCount.isDefined
- if (!availableStats) {
- fallbackRatio
- } else if (partDistinctCount.get.toDouble <=
otherDistinctCount.get.toDouble) {
- // there is likely an estimation error, so we fallback
- fallbackRatio
- } else {
- 1 - otherDistinctCount.get.toDouble /
partDistinctCount.get.toDouble
- }
- case _ => fallbackRatio
+ // the filtering ratio derived from column statistics, when reliable stats
are available
+ val statsBasedRatio: Option[Double] =
+ (partExpr.references.toList, otherExpr.references.toList) match {
+ // filter out expressions with more than one attribute on any side of
the operator
+ case (leftAttr :: Nil, rightAttr :: Nil)
+ if conf.dynamicPartitionPruningUseStats =>
+ // get the CBO stats for each attribute in the join condition
+ val partDistinctCount = distinctCounts(leftAttr, partPlan)
+ // A materialized filtering side (e.g. a LocalRelation) may carry
no column statistics
+ // but an exact `maxRows`, which is a conservative upper bound on
its join-key NDV. Use
+ // it when the column statistic is missing so a small, selective
materialized side still
+ // yields a statistics-based ratio rather than falling through to
the gated fallback.
+ val otherDistinctCount =
+ distinctCounts(rightAttr,
otherPlan).orElse(otherPlan.maxRows.map(BigInt(_)))
Review Comment:
Fixed in 896ace23. You're right that this is a PR-introduced regression in
the `fallbackFilterRatio=0` configuration, distinct from the general
hidden-non-determinism limitation: the `maxRows` bound I added in `6d2c13c` was
applied to *any* filtering side, so an opaque plan (the stateful
`mapPartitions`) that becomes DPP-eligible through its selective `filter(p>0)`
could derive a benefit from the `limit(1)`-supplied `maxRows` and get a
standalone, always-applied subquery -- re-evaluating the side and dropping the
rows the join's second evaluation needs.
I now gate the `maxRows` bound on
`isCheaplyRecomputableMaterializedPlan(otherPlan)`, exactly as you suggested. A
side that isn't cheaply recomputable (an opaque `mapPartitions`, or a `limit`
above one) no longer contributes a `maxRows`-based ratio, so with no usable
stats it reports no benefit and isn't injected standalone; the intended
`LocalRelation` / checkpointed `LogicalRDD` cases are unaffected. This isn't a
general repeatability gate in the benefit estimator -- `maxRows` is only a
sound NDV bound for a materialized/bounded side in the first place, so aligning
it with the eligibility predicate is where it belongs.
Added a regression test (`mappedKeys.filter($"p" > 0).limit(1)` with
`fallbackFilterRatio=0`): it asserts no standalone DPP and a correct result. I
verified it fails on the prior revision with exactly your symptom (`ArraySeq()
did not equal List([1])` -- the empty double-evaluation result) and passes now.
##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1880,33 +1992,20 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
DppMaterializedInputTestState.reset(counterId)
assert(df.collect().toSeq === Seq(Row(1)))
assert(activeDppSubqueries(df).isEmpty,
- s"Shouldn't trigger DPP for an opaque materialized plan:\n" +
+ s"Shouldn't execute standalone DPP for this filtering side:\n" +
df.queryExecution)
}
+ // A LocalRelation is cheaply recomputable and exposes an exact
maxRows, a conservative
+ // bound on its join-key NDV, so its pruning benefit is estimable and
it gets a standalone
+ // DPP subquery.
checkStandaloneDpp(Seq(1).toDF("p"))
Review Comment:
Good catch -- restored in 896ace23. You're right the merge resolution
dropped the coverage for the cheap `Filter`/`Project` (eligible), `Aggregate`
(excluded), and UDF-projection (excluded) branches of
`isCheaplyRecomputableMaterializedPlan`. Those fixtures previously forced
benefit through `fallbackFilterRatio=1000`, which this PR gates, so I couldn't
keep them as standalone-DPP cases (a checkpointed `Seq(1)` has neither stats
nor `maxRows`).
Following your suggestion, I re-established them through **broadcast
reuse**, which isolates the eligibility decision from the benefit term: a
cheaply-recomputable `Filter`/`Project` over a checkpointed input reuses the
join's broadcast for DPP, while the `Aggregate` and UDF-projection sides are
not DPP-eligible even with a reusable broadcast present. So a regression that
wrongly admits `Aggregate`/UDF (or drops the cheap `Filter`/`Project`) now
fails CI again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]