MaxGekk commented on code in PR #56071:
URL: https://github.com/apache/spark/pull/56071#discussion_r3387798443
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -212,10 +218,10 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
* To be able to prune partitions on a join key, the filtering side needs to
* meet the following requirements:
* (1) it can not be a stream
- * (2) it needs to contain a selective predicate used for filtering
+ * (2) it needs to contain a selective predicate or a materialized input
with known statistics
Review Comment:
"with known statistics" is not enforced by the code — the actual gate is
`isCheckpointedInput`, a boolean flag that is independent of statistics
availability. A checkpointed `LogicalRDD` with `originStats = None` still
qualifies (falling back to `defaultSizeInBytes` in `computeStats()`). The
qualifier is misleading.
```suggestion
* (2) it needs to contain a selective predicate or a materialized input
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -199,11 +200,16 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
/**
- * Search a filtering predicate in a given logical plan
+ * Search for a selective filtering operation or a precomputed input
carrying source statistics.
Review Comment:
"carrying source statistics" mischaracterizes the criterion. The function
checks materialization (`LocalRelation` holds in-memory rows;
`LogicalRDD.isCheckpointedInput` flags a checkpoint), not statistics quality.
The second sentence already captures the real property correctly: "without
evaluating an upstream computation again."
```suggestion
* Search for a selective filtering operation or a precomputed,
materialized input.
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -199,11 +200,16 @@ object PartitionPruning extends Rule[LogicalPlan] with
PredicateHelper with Join
/**
- * Search a filtering predicate in a given logical plan
+ * Search for a selective filtering operation or a precomputed input
carrying source statistics.
+ *
+ * A LocalRelation, or a LogicalRDD created by checkpoint() or
localCheckpoint(), can be reused
+ * as a broadcast build side for DPP without evaluating an upstream
computation again.
*/
- private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
+ private def hasSelectivePredicateOrMaterializedInput(plan: LogicalPlan):
Boolean = {
Review Comment:
`InMemoryRelation` (the result of `df.cache()` / `df.persist()`) is also a
materialized input that does not require upstream recomputation, yet it is
excluded from this check. The same file already treats `InMemoryRelation` as
zero-overhead in `calculatePlanOverhead`, so it is acknowledged as
already-materialized for cost purposes.
In practice: `sampledKeys.cache()` followed by a partitioned join does NOT
get DPP without a filter; `sampledKeys.localCheckpoint()` in the same pattern
DOES — despite both being materialized. Is this intentional?
If so, the Scaladoc should document why (one plausible rationale:
`InMemoryRelation` can be evicted under memory pressure and re-executed, while
a checkpoint is immutable for the query's lifetime). If not, adding `case _:
InMemoryRelation => true` would make the behavior consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]