viirya commented on a change in pull request #31167: URL: https://github.com/apache/spark/pull/31167#discussion_r559286684
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -190,7 +191,36 @@ case class AdaptiveSparkPlanExec( executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan))) // Start materialization of all new stages and fail fast if any stages failed eagerly - result.newStages.foreach { stage => + + // SPARK-33933: we should materialize broadcast stages first and wait the + // materialization finish before materialize other stages, to avoid waiting + // for broadcast tasks to be scheduled and leading to broadcast timeout. + val broadcastMaterializationFutures = result.newStages + .filter(_.isInstanceOf[BroadcastQueryStageExec]) + .map { stage => + var future: Future[Any] = null + try { + future = stage.materialize() + future.onComplete { res => + if (res.isSuccess) { + events.offer(StageSuccess(stage, res.get)) + } else { + events.offer(StageFailure(stage, res.failed.get)) + } + }(AdaptiveSparkPlanExec.executionContext) + } catch { + case e: Throwable => + cleanUpAndThrowException(Seq(e), Some(stage.id)) + } + future + } + + // Wait for the materialization of all broadcast stages finish Review comment: No. For normal query without AQE, broadcast job is triggered in preparing the SparkPlan (please see `BroadcastExchangeExec.relationFuture`) and it is different to the job of the query itself. Then we _don't_ wait here. This is the main difference to your change here. Immediately after triggering the broadcast task, Spark continues other part of the query, until it _really_ needs the broadcasted result (please see `executeBroadcast`). But here you wait for the materialization of broadcast task finishes. So even there are still resources available to run other shuffle stages, they won't be run. It is different to current AQE and non-AQE query execution. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org