zhongyu09 commented on a change in pull request #31167:
URL: https://github.com/apache/spark/pull/31167#discussion_r559281993



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -190,7 +191,36 @@ case class AdaptiveSparkPlanExec(
           executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
 
           // Start materialization of all new stages and fail fast if any 
stages failed eagerly
-          result.newStages.foreach { stage =>
+
+          // SPARK-33933: we should materialize broadcast stages first and 
wait the
+          // materialization finish before materialize other stages, to avoid 
waiting
+          // for broadcast tasks to be scheduled and leading to broadcast 
timeout.
+          val broadcastMaterializationFutures = result.newStages
+            .filter(_.isInstanceOf[BroadcastQueryStageExec])
+            .map { stage =>
+            var future: Future[Any] = null
+            try {
+              future = stage.materialize()
+              future.onComplete { res =>
+                if (res.isSuccess) {
+                  events.offer(StageSuccess(stage, res.get))
+                } else {
+                  events.offer(StageFailure(stage, res.failed.get))
+                }
+              }(AdaptiveSparkPlanExec.executionContext)
+            } catch {
+              case e: Throwable =>
+                cleanUpAndThrowException(Seq(e), Some(stage.id))
+            }
+            future
+          }
+
+          // Wait for the materialization of all broadcast stages finish

Review comment:
       I think we talk about the same thing. For non-AQE, usually it's one job 
for one query. So when we execute the query, transform it to RDD, we will need 
the broadcasted result. This is the place I mentioned. Before submit the job of 
the query, we will waits the result of broadcast.
   
   here's part of the log I printed:
   `
   1610935584070 [Thread[ScalaTest-run-running-AdaptiveQueryExecSuite,5,main]] 
class org.apache.spark.sql.execution.WholeStageCodegenExec SparkPlan.prepare 
   1610935584072 BroadcastExchangeExec.doPrepare()
   1610935584073 beforeCollect 
   1610935584074 [Thread[broadcast-exchange-0,5,main]] class 
org.apache.spark.sql.execution.CoalesceExec SparkPlan.prepare 
   1610935584163 sc.runJob 
   1610935584223 DAGScheduler.runJob 
   1610935584224 DAGScheduler.submitJob Id = 0 
   1610935584225 eventProcessLoop.post JobSubmitted 
   1610935597943 beforeBuild 
   1610935597982 promise.trySuccess(broadcasted) 
   1610935597988 [Thread[ScalaTest-run-running-AdaptiveQueryExecSuite,5,main]] 
class org.apache.spark.sql.execution.InputAdapter SparkPlan.prepare 
   1610935598184 sc.runJob 
   1610935598194 DAGScheduler.runJob 
   1610935598195 DAGScheduler.submitJob Id = 1 
   1610935598195 eventProcessLoop.post JobSubmitted 
   `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to