MaxGekk commented on code in PR #56071:
URL: https://github.com/apache/spark/pull/56071#discussion_r3387798443


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -212,10 +218,10 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
    * To be able to prune partitions on a join key, the filtering side needs to
    * meet the following requirements:
    *   (1) it can not be a stream
-   *   (2) it needs to contain a selective predicate used for filtering
+   *   (2) it needs to contain a selective predicate or a materialized input 
with known statistics

Review Comment:
   "with known statistics" is not enforced by the code — the actual gate is 
`isCheckpointedInput`, a boolean flag that is independent of statistics 
availability. A checkpointed `LogicalRDD` with `originStats = None` still 
qualifies (falling back to `defaultSizeInBytes` in `computeStats()`). The 
qualifier is misleading.
   ```suggestion
      *   (2) it needs to contain a selective predicate or a materialized input
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -199,11 +200,16 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
 
 
   /**
-   * Search a filtering predicate in a given logical plan
+   * Search for a selective filtering operation or a precomputed input 
carrying source statistics.

Review Comment:
   "carrying source statistics" mischaracterizes the criterion. The function 
checks materialization (`LocalRelation` holds in-memory rows; 
`LogicalRDD.isCheckpointedInput` flags a checkpoint), not statistics quality. 
The second sentence already captures the real property correctly: "without 
evaluating an upstream computation again."
   ```suggestion
      * Search for a selective filtering operation or a precomputed, 
materialized input.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala:
##########
@@ -199,11 +200,16 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
 
 
   /**
-   * Search a filtering predicate in a given logical plan
+   * Search for a selective filtering operation or a precomputed input 
carrying source statistics.
+   *
+   * A LocalRelation, or a LogicalRDD created by checkpoint() or 
localCheckpoint(), can be reused
+   * as a broadcast build side for DPP without evaluating an upstream 
computation again.
    */
-  private def hasSelectivePredicate(plan: LogicalPlan): Boolean = {
+  private def hasSelectivePredicateOrMaterializedInput(plan: LogicalPlan): 
Boolean = {

Review Comment:
   `InMemoryRelation` (the result of `df.cache()` / `df.persist()`) is also a 
materialized input that does not require upstream recomputation, yet it is 
excluded from this check. The same file already treats `InMemoryRelation` as 
zero-overhead in `calculatePlanOverhead`, so it is acknowledged as 
already-materialized for cost purposes.
   
   In practice: `sampledKeys.cache()` followed by a partitioned join does NOT 
get DPP without a filter; `sampledKeys.localCheckpoint()` in the same pattern 
DOES — despite both being materialized. Is this intentional?
   
   If so, the Scaladoc should document why (one plausible rationale: 
`InMemoryRelation` can be evicted under memory pressure and re-executed, while 
a checkpoint is immutable for the query's lifetime). If not, adding `case _: 
InMemoryRelation => true` would make the behavior consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to