cloud-fan commented on code in PR #49715:
URL: https://github.com/apache/spark/pull/49715#discussion_r1940512013
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -366,58 +372,52 @@ case class AdaptiveSparkPlanExec(
if (errors.nonEmpty) {
cleanUpAndThrowException(errors.toSeq, None)
}
-
- // Try re-optimizing and re-planning. Adopt the new plan if its cost
is equal to or less
- // than that of the current plan; otherwise keep the current physical
plan together with
- // the current logical plan since the physical plan's logical links
point to the logical
- // plan it has originated from.
- // Meanwhile, we keep a list of the query stages that have been
created since last plan
- // update, which stands for the "semantic gap" between the current
logical and physical
- // plans. And each time before re-planning, we replace the
corresponding nodes in the
- // current logical plan with logical query stages to make it
semantically in sync with
- // the current physical plan. Once a new plan is adopted and both
logical and physical
- // plans are updated, we can clear the query stage list because at
this point the two plans
- // are semantically and physically in sync again.
- val logicalPlan =
replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
- val afterReOptimize = reOptimize(logicalPlan)
- if (afterReOptimize.isDefined) {
- val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
- val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
- val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
- if (newCost < origCost ||
- (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
- lazy val plans =
- sideBySide(currentPhysicalPlan.treeString,
newPhysicalPlan.treeString).mkString("\n")
- logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
- cleanUpTempTags(newPhysicalPlan)
- currentPhysicalPlan = newPhysicalPlan
- currentLogicalPlan = newLogicalPlan
- stagesToReplace = Seq.empty[QueryStageExec]
+ if (!currentPhysicalPlan.isInstanceOf[ResultQueryStageExec]) {
+ // Try re-optimizing and re-planning. Adopt the new plan if its cost
is equal to or less
+ // than that of the current plan; otherwise keep the current
physical plan together with
+ // the current logical plan since the physical plan's logical links
point to the logical
+ // plan it has originated from.
+ // Meanwhile, we keep a list of the query stages that have been
created since last plan
+ // update, which stands for the "semantic gap" between the current
logical and physical
+ // plans. And each time before re-planning, we replace the
corresponding nodes in the
+ // current logical plan with logical query stages to make it
semantically in sync with
+ // the current physical plan. Once a new plan is adopted and both
logical and physical
+ // plans are updated, we can clear the query stage list because at
this point the two
+ // plans are semantically and physically in sync again.
+ val logicalPlan =
replaceWithQueryStagesInLogicalPlan(currentLogicalPlan,
+ stagesToReplace.toSeq)
+ val afterReOptimize = reOptimize(logicalPlan)
+ if (afterReOptimize.isDefined) {
+ val (newPhysicalPlan, newLogicalPlan) = afterReOptimize.get
+ val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
+ val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
+ if (newCost < origCost ||
+ (newCost == origCost && currentPhysicalPlan != newPhysicalPlan))
{
+ lazy val plans = sideBySide(
+ currentPhysicalPlan.treeString,
newPhysicalPlan.treeString).mkString("\n")
+ logOnLevel(log"Plan changed:\n${MDC(QUERY_PLAN, plans)}")
+ cleanUpTempTags(newPhysicalPlan)
+ currentPhysicalPlan = newPhysicalPlan
+ currentLogicalPlan = newLogicalPlan
+ stagesToReplace.clear()
+ }
}
}
// Now that some stages have finished, we can try creating new stages.
- result = createQueryStages(currentPhysicalPlan)
+ result = createQueryStages(fun, currentPhysicalPlan, false)
}
-
- ruleContext = ruleContext.withFinalStage(isFinalStage = true)
- // Run the final plan when there's no more unfinished stages.
- currentPhysicalPlan = applyPhysicalRulesWithRuleContext(
- optimizeQueryStage(result.newPlan, isFinalStage = true),
- postStageCreationRules(supportsColumnar),
- Some((planChangeLogger, "AQE Post Stage Creation")))
- ruleContext.clearConfigs()
- _isFinalPlan = true
- executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
- currentPhysicalPlan
}
+ _isFinalPlan = true
+ finalPlanUpdate
+
currentPhysicalPlan.asInstanceOf[ResultQueryStageExec].resultOption.get().get.asInstanceOf[T]
}
// Use a lazy val to avoid this being called more than once.
@transient private lazy val finalPlanUpdate: Unit = {
// Subqueries that don't belong to any query stage of the main query will
execute after the
// last UI update in `getFinalPhysicalPlan`, so we need to update UI here
again to make sure
// the newly generated nodes of those subqueries are updated.
- if (shouldUpdatePlan && currentPhysicalPlan.exists(_.subqueries.nonEmpty))
{
+ if (shouldUpdatePlan) {
Review Comment:
can we update the comment above to reflect this change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]