zhongyu09 opened a new pull request #31269: URL: https://github.com/apache/spark/pull/31269
### What changes were proposed in this pull request? This PR is the same as https://github.com/apache/spark/pull/30998, but with a better UT. In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others. This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved. ### Why are the changes needed? When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. 1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job 2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage 1 is trigger before 2, so in normal cases, the broadcast job will be submit first. However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job. Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Add UT ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
