cloud-fan commented on code in PR #56636:
URL: https://github.com/apache/spark/pull/56636#discussion_r3449740922
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,38 +205,47 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
}
/**
- * Returns whether a plan can be evaluated repeatedly from materialized
inputs and produce the
- * same rows.
+ * Returns whether the filtering side is cheap enough to recompute that DPP
is worthwhile even
+ * without a selective predicate: its cost is dominated by an
already-materialized input, with
+ * only scan-cost-bound operators above it.
*
- * LocalRelation rows are already locally available. A checkpoint-derived
LogicalRDD establishes
- * an explicit checkpoint boundary and can be used as a broadcast build side
for DPP without
- * evaluating the computation upstream of that boundary again.
+ * This is the cost-side counterpart to `hasSelectivePredicate`. A selective
predicate is
+ * evidence of a high pruning ratio (the benefit term of
`pruningHasBenefit`); an
+ * already-materialized input is the complementary signal on the cost term
-- a `LocalRelation`
+ * (rows already local) or a checkpoint-derived `LogicalRDD`
(`isCheckpointedInput` requires the
+ * RDD to be actually checkpointed, so a lazy checkpoint does not qualify)
is ~free to re-read,
+ * so even a modest pruning ratio clears the benefit bar. `InMemoryRelation`
is excluded because
+ * cache()/persist() are lazy: its presence does not guarantee the data has
been materialized,
+ * and missing or evicted blocks may require recomputing the upstream plan.
*
- * InMemoryRelation is intentionally excluded because cache() and persist()
are lazy: its
- * presence does not guarantee the cached data has been materialized, and
missing or evicted
- * blocks may require evaluating the upstream computation again.
+ * The operators above the materialized input are restricted to ones whose
cost is dominated by
+ * their input's scan bytes -- the only cost `calculatePlanOverhead` can
see. `Project`/`Filter`
+ * add negligible compute, a `Union`'s cost is the sum of its (materialized)
children, and
+ * `SubqueryAlias` is a no-op. `Aggregate`, joins, and opaque RDD operators
(e.g. `mapPartitions`)
+ * are excluded: they add compute or a shuffle the scan-bytes cost model
cannot see, so treating
+ * such a side as a cheap materialized input would overstate the pruning
benefit. A subquery in a
+ * `Project`/`Filter` is excluded for the same reason -- it embeds its own
plan, whose recompute
+ * cost `calculatePlanOverhead` does not account for.
*
- * The supported operators are intentionally narrow. DPP is optional, and
logical-plan
- * determinism does not cover user functions stored outside Catalyst
expressions.
+ * This is a cost guard only; it does not check that re-evaluating the side
yields the same rows.
+ * DPP duplicates the filtering side on every eligibility path and has
always assumed it is
+ * repeatable, so a non-deterministic operator above the materialized input
(e.g. a `rand()`
+ * projection) can still produce different keys on re-evaluation -- the same
pre-existing,
+ * DPP-wide limitation the selective-predicate path carries, intentionally
left to a future
+ * system-level design rather than patched piecemeal here. The one
materialized-input-specific
+ * repeatability concern -- a checkpoint that has not been materialized yet
-- is handled by
+ * `LogicalRDD.isCheckpointedInput` requiring the RDD to be actually
checkpointed.
*/
- private def isRepeatableMaterializedPlan(plan: LogicalPlan): Boolean = {
- def isRepeatableExpression(expression: Expression): Boolean = {
- expression.deterministic && !SubqueryExpression.hasSubquery(expression)
&&
- !expression.exists {
- case _: NonSQLExpression | _: UserDefinedExpression | _:
UserDefinedGenerator => true
- case _ => false
- }
- }
-
+ private def isCheaplyRecomputableMaterializedPlan(plan: LogicalPlan):
Boolean = {
plan match {
case _: LocalRelation => true
case r: LogicalRDD => r.isCheckpointedInput
- case Project(projectList, child) if
projectList.forall(isRepeatableExpression) =>
- isRepeatableMaterializedPlan(child)
- case Filter(condition, child) if isRepeatableExpression(condition) =>
- isRepeatableMaterializedPlan(child)
- case u: Union => u.children.forall(isRepeatableMaterializedPlan)
- case SubqueryAlias(_, child) => isRepeatableMaterializedPlan(child)
+ case Project(projectList, child) if
!projectList.exists(SubqueryExpression.hasSubquery) =>
Review Comment:
Good catch, and you're right that this is consistent with the cost framing
rather than the dropped determinism check. Restored the `NonSQLExpression` /
`UserDefinedExpression` / `UserDefinedGenerator` exclusion as an
`isScanCostBoundExpression` helper guarding the `Project`/`Filter` cases: an
opaque user function adds CPU/IO that `calculatePlanOverhead` can't see, so a
side carrying one no longer counts as a cheap materialized input. Added a
negative test for a UDF projection over a checkpoint to lock it in.
(3571d1218b8)
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -205,38 +205,47 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
}
/**
- * Returns whether a plan can be evaluated repeatedly from materialized
inputs and produce the
- * same rows.
+ * Returns whether the filtering side is cheap enough to recompute that DPP
is worthwhile even
+ * without a selective predicate: its cost is dominated by an
already-materialized input, with
+ * only scan-cost-bound operators above it.
*
- * LocalRelation rows are already locally available. A checkpoint-derived
LogicalRDD establishes
- * an explicit checkpoint boundary and can be used as a broadcast build side
for DPP without
- * evaluating the computation upstream of that boundary again.
+ * This is the cost-side counterpart to `hasSelectivePredicate`. A selective
predicate is
+ * evidence of a high pruning ratio (the benefit term of
`pruningHasBenefit`); an
+ * already-materialized input is the complementary signal on the cost term
-- a `LocalRelation`
+ * (rows already local) or a checkpoint-derived `LogicalRDD`
(`isCheckpointedInput` requires the
+ * RDD to be actually checkpointed, so a lazy checkpoint does not qualify)
is ~free to re-read,
+ * so even a modest pruning ratio clears the benefit bar. `InMemoryRelation`
is excluded because
+ * cache()/persist() are lazy: its presence does not guarantee the data has
been materialized,
+ * and missing or evicted blocks may require recomputing the upstream plan.
*
- * InMemoryRelation is intentionally excluded because cache() and persist()
are lazy: its
- * presence does not guarantee the cached data has been materialized, and
missing or evicted
- * blocks may require evaluating the upstream computation again.
+ * The operators above the materialized input are restricted to ones whose
cost is dominated by
+ * their input's scan bytes -- the only cost `calculatePlanOverhead` can
see. `Project`/`Filter`
+ * add negligible compute, a `Union`'s cost is the sum of its (materialized)
children, and
+ * `SubqueryAlias` is a no-op. `Aggregate`, joins, and opaque RDD operators
(e.g. `mapPartitions`)
+ * are excluded: they add compute or a shuffle the scan-bytes cost model
cannot see, so treating
+ * such a side as a cheap materialized input would overstate the pruning
benefit. A subquery in a
+ * `Project`/`Filter` is excluded for the same reason -- it embeds its own
plan, whose recompute
+ * cost `calculatePlanOverhead` does not account for.
*
- * The supported operators are intentionally narrow. DPP is optional, and
logical-plan
- * determinism does not cover user functions stored outside Catalyst
expressions.
+ * This is a cost guard only; it does not check that re-evaluating the side
yields the same rows.
+ * DPP duplicates the filtering side on every eligibility path and has
always assumed it is
Review Comment:
You're right, thanks -- the `rand()` example was inaccurate. Confirmed in
code: `PlanExpression.deterministic` folds in `plan.deterministic`
(subquery.scala:47-48), so a non-deterministic build query makes the
`DynamicPruningSubquery` non-deterministic;
`NodeWithOnlyDeterministicProjectAndFilter` then fails to match and
`CleanupDynamicPruningFilters` rewrites the dynamic predicate to `true` before
physical planning, so it's never re-evaluated. Rewrote the paragraph to
describe the residual limitation as *hidden* non-determinism left marked
deterministic, which the opaque-expression exclusion above further narrows.
(3571d1218b8)
##########
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala:
##########
@@ -1880,13 +1880,24 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
DppMaterializedInputTestState.reset(counterId)
assert(df.collect().toSeq === Seq(Row(1)))
assert(activeDppSubqueries(df).isEmpty,
- s"Shouldn't trigger DPP for a non-repeatable materialized plan:\n"
+
+ s"Shouldn't trigger DPP for an opaque materialized plan:\n" +
df.queryExecution)
}
checkStandaloneDpp(Seq(1).toDF("p"))
checkStandaloneDpp(Seq(1).toDF("p").localCheckpoint(eager = true))
+ // Cheap, scan-cost-bound operators above a materialized input stay
eligible: their
+ // recompute cost is dominated by the materialized leaf that
calculatePlanOverhead sees.
+ checkStandaloneDpp(
+ Seq(1).toDF("p").localCheckpoint(eager = true).filter($"p" >
0).select($"p"))
Review Comment:
Confirmed -- `p > 0` is a `BinaryComparison`, so `isLikelySelective` returns
true and the assertion passed via `hasSelectivePredicate` without ever
consulting the helper. Changed the predicate to `$"p".cast("boolean")`, which
`isLikelySelective` does not classify as selective, so eligibility now
genuinely comes from `isCheaplyRecomputableMaterializedPlan`. (3571d1218b8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]