cloud-fan commented on code in PR #49715:
URL: https://github.com/apache/spark/pull/49715#discussion_r1942374247


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -268,25 +276,23 @@ case class AdaptiveSparkPlanExec(
 
   def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
 
-  private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
-    if (isFinalPlan) return currentPhysicalPlan
-
+  /**
+   * Run `fun` on finalized physical plan
+   */
+  def withFinalPlanUpdate[T](fun: SparkPlan => T): T = lock.synchronized {
+    _isFinalPlan = false
     // In case of this adaptive plan being executed out of `withActive` scoped 
functions, e.g.,
     // `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
     // created in the middle of the execution.
     context.session.withActive {
       val executionId = getExecutionId
-      // Use inputPlan logicalLink here in case some top level physical nodes 
may be removed
-      // during `initialPlan`
-      var currentLogicalPlan = inputPlan.logicalLink.get
-      var result = createQueryStages(currentPhysicalPlan)
+      var result = createQueryStages(fun, currentPhysicalPlan, true)

Review Comment:
   ```suggestion
         var result = createQueryStages(fun, currentPhysicalPlan, firstRun = 
true)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to