ulysses-you commented on a change in pull request #32742:
URL: https://github.com/apache/spark/pull/32742#discussion_r644593517
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Let's say we have a complex plan.
* two join with two exchange
* join2 rigth side is empty
the logical join looks like:
```
Join2
:- Join1
: : +- Relation parquet
: +- LogicalQueryStage Project, BroadcastQueryStage 0
+- LogicalQueryStage Relation parquet, ShuffleQueryStage 1
```
* first round, QueryStage 0 is materialized and QueryStage 1 is not
materialized. Then AQE Optimier can only optimize join1 but unfortunately the
rule has no effects. At this monent, we would mark this rule as ineffictive.
* second round, QueryStage 1 is materialized. Then AQE Optimier can optimize
join2 but the rule has been already marked as ineffictive. As a result we
cann't optimize it.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
##########
@@ -53,8 +54,8 @@ object AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase {
empty(j)
}
- // TODO we need use transformUpWithPruning instead of transformUp
- def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
+ _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION,
TRUE_OR_FALSE_LITERAL), ruleId) {
Review comment:
yes, in the latest version the rule used `fixedPoint` batch.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Seems it's meaningless to do `transformUpWithPruning` at AQE side. I see
what you worried but there has an another issue:
Now we reuse the rule which is already used `transformUpWithPruning` between
normal and AQE Optimizer and maybe we will introduce other rules in future.
Then If we don't invaildate ineffective rules, we need to make new rule for
every exists rule. or we can use other tag to distinguish where the rule used ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]