sigmod commented on a change in pull request #32742:
URL: https://github.com/apache/spark/pull/32742#discussion_r644555757
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Why do we need to invalidate ineffective rules? If AQE Optimizer
rewrote a leaf node to `LocalRelation`, the path from the new `LocalRelation`
to the new root are all *new* TreeNode instances, in which ineffective rule
bits are not set.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
##########
@@ -53,8 +54,8 @@ object AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase {
empty(j)
}
- // TODO we need use transformUpWithPruning instead of transformUp
- def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
+ _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION,
TRUE_OR_FALSE_LITERAL), ruleId) {
Review comment:
Can AQEPropagateEmptyRelation be invoked multiple times on the same tree
or subtree?
If not, then we don't need "ruleId".
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Why do we need to invalidate ineffective rules? If AQE Optimizer
rewrote a leaf node to `LocalRelation`, the path from the new `LocalRelation`
to the new root only contains *new* TreeNode instances, in which ineffective
rule bits are not set.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
##########
@@ -53,8 +54,8 @@ object AQEPropagateEmptyRelation extends
PropagateEmptyRelationBase {
empty(j)
}
- // TODO we need use transformUpWithPruning instead of transformUp
- def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
+ _.containsAnyPattern(LOGICAL_QUERY_STAGE, LOCAL_RELATION,
TRUE_OR_FALSE_LITERAL), ruleId) {
Review comment:
Ah, make sense. I somehow read it as in a run-once batch.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Ok, IIUC, wether a stage is "materialized or not" is kept as an external
varying state outside of plan nodes?
If that's the case, the same rule object is invoked multiple times for the
same logical plan but violates the contract of passing `ruleId`s:
https://github.com/databricks/runtime/blob/88acfbbb14f1396c312c394ed8a2a645738f83f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L445-L446
I'd prefer us keeping ineffective rule bits internal to TreeNodes for
simplicity, otherwise it might be difficult to reason about certain behaviors.
I suspect we don't have that many fixpoint iterations in AQEOptimizer? What
`ruleId` helped most are Analyzer rules, because the fix-point batch can run
5~8 iterations.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Ok, IIUC, wether a stage is "materialized or not" is kept as an external
varying state outside of plan nodes?
If that's the case, the same rule object is invoked multiple times for the
same logical plan but violates the contract of passing `ruleId`s:
https://github.com/databricks/runtime/blob/88acfbbb14f1396c312c394ed8a2a645738f83f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L445-L446
I'd prefer us keeping ineffective rule bits internal to TreeNodes for
simplicity, otherwise it might be difficult to reason about certain behaviors.
I suspect we don't have that many fixpoint iterations in AQEOptimizer so that
passing `ruleId` doesn't help that much? What `ruleId` helped most are Analyzer
rules, because the fix-point batch can run 5~8 iterations.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Ok, IIUC, wether a stage is "materialized or not" is kept as a varying
external state outside of plan nodes?
If that's the case, the same rule object is invoked multiple times for the
same logical plan but violates the contract of passing `ruleId`s:
https://github.com/databricks/runtime/blob/88acfbbb14f1396c312c394ed8a2a645738f83f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L445-L446
I'd prefer us keeping ineffective rule bits internal to TreeNodes for
simplicity, otherwise it might be difficult to reason about certain behaviors.
I suspect we don't have that many fixpoint iterations in AQEOptimizer so that
passing `ruleId` doesn't help that much? What `ruleId` helped most are Analyzer
rules, because the fix-point batch can run 5~8 iterations.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
Ok, IIUC, wether a stage is "materialized or not" is kept as a varying
external state outside of plan nodes?
If that's the case, the same rule object is invoked multiple times for the
same logical plan but violates the contract of passing `ruleId`s:
https://github.com/databricks/runtime/blob/88acfbbb14f1396c312c394ed8a2a645738f83f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L445-L446
I'd prefer us keeping ineffective rule bits internal to TreeNodes for
simplicity, otherwise it might be difficult to reason about certain behaviors.
I suspect we don't have that many fixpoint iterations in AQEOptimizer so that
passing `ruleId` doesn't help that much? What `ruleId` helped most are Analyzer
rules, because the Analyzer fix-point batch can run 5~8 iterations.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -603,6 +603,19 @@ case class AdaptiveSparkPlanExec(
(newPlan, optimized)
}
+ /**
+ * Clean up logical plan stats before re-optimize
+ */
+ private def cleanupStats(logicalPlan: LogicalPlan): Unit = {
+ logicalPlan.invalidateStatsCache()
+ // We must invalidate ineffective rules before re-optimize since AQE
Optimizer may introduce
+ // LocalRelation that can affect result.
Review comment:
>> Now we reuse the rule which is already used transformUpWithPruning
>> between normal and AQE Optimizer and maybe we will introduce other rules
in future.
Can you elaborate a bit more? I'm not sure if I understand the exact issue
here.
IIUC, the main issue here is that `invalidateStatsCache` breaks the
`immutability` property of LogicalPlans and SparkPlans?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]