zhongyu09 opened a new pull request #31167: URL: https://github.com/apache/spark/pull/31167
### What changes were proposed in this pull request? In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, materialize BroadcastQueryStage first and wait the materialization finish before materialize other (ShuffleQueryStage) stages. It can make sure the broadcast job are scheduled and finished before map jobs to avoid waiting for job schedule and cause broadcast timeout. This is the same behavior with non-AQE queries. Actually, we want to only control the schedule for broadcast job is before map jobs. However, it is difficult to control and may have large changes to spark-core. So the trade off is wait broadcast job finish before materialize ShuffleQueryStage. Consider the case, a is a large table, b and c are very small in-memory dimension tables. ``` SELECT a.id, a.name, b.name, c.name, count(a.value) FROM a JOIN b on a.id = b.id JOIN c on a.name = c.name GROUP BY a.id ``` For non-AQE: 1. run collect b, then broadcast b 2. run collect c, then broadcast c 3. submit job which contains 2 stage For current AQE: 1. submit 3 job( shuffle map stage for a, collect b and broadcast b, collect c broadcast c ) almost at the same time 2. when all finished, run result stage For AQE with this PR: 1. submit 2 job(collect b and broadcast b, collect c broadcast c) at the same time 2. wait broadcast of b and c finish 3. run shuffle map stage 2. run result stage ### Why are the changes needed? In non-AQE, we always wait the broadcast finish before submit shuffle map tasks. When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. https://github.com/apache/spark/pull/30998 give a solution by sort the new stages by class type to make sure the calling of materialize() for BroadcastQueryState precede others. However, the solution is not perfect and because of the flaky of UT, it is revered. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. 1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job 2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage 1 is trigger before 2, so in normal cases, the broadcast job will be submit first. However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first So there's still risk for the shuffle map job schedule earlier before broadcast job. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? 1. Add UT 2. Test the code using dev environment in https://issues.apache.org/jira/browse/SPARK-33933 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
