peter-toth commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3374308996


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -271,119 +279,124 @@ case class AdaptiveSparkPlanExec(
     // `plan.queryExecution.rdd`, we need to set active session here as new 
plan nodes can be
     // created in the middle of the execution.
     context.session.withActive {
-      val executionId = getExecutionId
-      // Use inputPlan logicalLink here in case some top level physical nodes 
may be removed
-      // during `initialPlan`
-      var currentLogicalPlan = inputPlan.logicalLink.get
-      var result = createQueryStages(fun, currentPhysicalPlan, firstRun = true)
-      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
-      val errors = new mutable.ArrayBuffer[Throwable]()
-      var stagesToReplace = Seq.empty[QueryStageExec]
-      while (!result.allChildStagesMaterialized) {
-        currentPhysicalPlan = result.newPlan
-        if (result.newStages.nonEmpty) {
-          stagesToReplace = result.newStages ++ stagesToReplace
-          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
-
-          // SPARK-33933: we should submit tasks of broadcast stages first, to 
avoid waiting
-          // for tasks to be scheduled and leading to broadcast timeout.
-          // This partial fix only guarantees the start of materialization for 
BroadcastQueryStage
-          // is prior to others, but because the submission of collect job for 
broadcasting is
-          // running in another thread, the issue is not completely resolved.
-          val reorderedNewStages = result.newStages
-            .sortWith {
-              case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => 
false
-              case (_: BroadcastQueryStageExec, _) => true
-              case _ => false
-            }
+      // Record this node's planning rules into its own tracker; merged into 
the query's tracker by
+      // the main node in `finalPlanUpdate`.
+      QueryPlanningTracker.withTracker(tracker) {

Review Comment:
   No meaningful change, just wrapping into `QueryPlanningTracker.withTracker`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to