ulysses-you commented on a change in pull request #32742:
URL: https://github.com/apache/spark/pull/32742#discussion_r644593517



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
     (newPlan, optimized)
   }
 
+  /**
+   * Clean up logical plan stats before re-optimize
+   */
+  private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+    logicalPlan.invalidateStatsCache()
+    // We must invalidate ineffective rules before re-optimize since AQE 
Optimizer may introduce
+    // LocalRelation that can affect result.

Review comment:
       Let's say we have a complex plan.
   * two join with two exchange
   *  join2 rigth side is empty
   
   the logical join looks like:
   ```
   Join2
   :- Join1
   :  :  +- Relation parquet
   :  +- LogicalQueryStage Project, BroadcastQueryStage 0
   +- LogicalQueryStage Relation parquet, ShuffleQueryStage 1
   ```
   
   * first round, QueryStage 0 is materialized and QueryStage 1 is not 
materialized. Then AQE Optimier can only optimize join1 but unfortunately the 
rule has no effects. At this monent, we would mark this rule as ineffictive.
   * second round, QueryStage 1 is materialized. Then AQE Optimier can optimize 
join2 but the rule has been already marked as ineffictive. As a result we 
cann't optimize it.
   
   

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
##########
@@ -53,8 +54,8 @@ object AQEPropagateEmptyRelation extends 
PropagateEmptyRelationBase {
       empty(j)
   }
 
-  // TODO we need use transformUpWithPruning instead of transformUp
-  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
+    _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION, 
TRUE_OR_FALSE_LITERAL), ruleId) {

Review comment:
       yes, in the latest version the rule used `fixedPoint` batch.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
     (newPlan, optimized)
   }
 
+  /**
+   * Clean up logical plan stats before re-optimize
+   */
+  private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+    logicalPlan.invalidateStatsCache()
+    // We must invalidate ineffective rules before re-optimize since AQE 
Optimizer may introduce
+    // LocalRelation that can affect result.

Review comment:
       Seems it's meaningless to do `transformUpWithPruning` at AQE side. I see 
what you worried but there has an another issue:
   
   Now we reuse the rule which is already used `transformUpWithPruning` between 
normal and AQE Optimizer and maybe we will introduce other rules in future. 
Then If we don't invaildate ineffective rules, we need to make new rule for 
every exists rule. or we can use other tag to distinguish where the rule used ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to