zhongyu09 opened a new pull request #31269:
URL: https://github.com/apache/spark/pull/31269


   ### What changes were proposed in this pull request?
   This PR is the same as https://github.com/apache/spark/pull/30998, but with 
a better UT.
   In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, 
sort the new stages by class type to make sure BroadcastQueryState precede 
others.
   This partial fix only grantee the start of materialization for 
BroadcastQueryStage is prior to others, but because the submission of collect 
job for broadcasting is run in another thread, the issue is not completely 
solved.
   
   ### Why are the changes needed?
   When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan 
bottom up and create query stage for materialized part by createQueryStages and 
materialize those new created query stages to submit map stages or 
broadcasting. When ShuffleQueryStage are materializing before 
BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost 
at the same time, but map stage will hold all the computing resources. If the 
map stage runs slow (when lots of data needs to process and the resource is 
limited), the broadcast job cannot be started(and finished) before 
spark.sql.broadcastTimeout, thus cause whole job failed (introduced in 
SPARK-31475).
   The workaround to increase spark.sql.broadcastTimeout doesn't make sense and 
graceful, because the data to broadcast is very small.
   
   The order of calling materialize can guarantee that the order of task to be 
scheduled in normal circumstances, but, the guarantee is not strict since the 
submit of broadcast job and shuffle map job are in different thread.
   1. for broadcast job, call doPrepare() in main thread, and then start the 
real materialization in "broadcast-exchange-0" thread pool: calling 
getByteArrayRdd().collect() to submit collect job
   2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() 
which call sparkContext.submitMapStage() directly in main thread to submit map 
stage
   
   1 is trigger before 2, so in normal cases, the broadcast job will be submit 
first.
   However, we can not control how fast the two thread runs, so the 
"broadcast-exchange-0" thread could run a little bit slower than main thread, 
result in map stage submit first. So there's still risk for the shuffle map job 
schedule earlier before broadcast job.
   
   Since completely fix the issue is complex and might introduce major changes, 
we need more time to follow up. This partial fix is better than do nothing, it 
resolved most cases in SPARK-33933.
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   
   ### How was this patch tested?
   Add UT


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to