zhongyu09 opened a new pull request #31167:
URL: https://github.com/apache/spark/pull/31167


   ### What changes were proposed in this pull request?
   In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, 
materialize BroadcastQueryStage first and wait the materialization finish 
before materialize other (ShuffleQueryStage) stages.
   It can make sure the broadcast job are scheduled and finished before map 
jobs to avoid waiting for job schedule and cause broadcast timeout. This is the 
same behavior with non-AQE queries.
   Actually, we want to only control the schedule for broadcast job is before 
map jobs. However, it is difficult to control and may have large changes to 
spark-core. So the trade off is wait broadcast job finish before materialize 
ShuffleQueryStage.
   
   Consider the case, a is a large table, b and c are very small in-memory 
dimension tables.
   ```
   SELECT a.id, a.name, b.name, c.name, count(a.value) 
   FROM a 
   JOIN b on a.id = b.id 
   JOIN c on a.name = c.name 
   GROUP BY a.id
   ```
   For non-AQE: 
   1. run collect b, then broadcast b 
   2. run collect c, then broadcast c
   3. submit job which contains 2 stage
   
   For current AQE: 
   1. submit 3 job( shuffle map stage for a, collect b and broadcast b, collect 
c broadcast c ) almost at the same time
   2. when all finished, run result stage
   
   For AQE with this PR:
   1. submit 2 job(collect b and broadcast b, collect c broadcast c) at the 
same time
   2. wait broadcast of b and c finish
   3. run shuffle map stage
   2. run result stage
   
   ### Why are the changes needed?
   In non-AQE, we always wait the broadcast finish before submit shuffle map 
tasks.
   
   When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan 
bottom up and create query stage for materialized part by createQueryStages and 
materialize those new created query stages to submit map stages or 
broadcasting. When ShuffleQueryStage are materializing before 
BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost 
at the same time, but map stage will hold all the computing resources. If the 
map stage runs slow (when lots of data needs to process and the resource is 
limited), the broadcast job cannot be started(and finished) before 
spark.sql.broadcastTimeout, thus cause whole job failed (introduced in 
SPARK-31475).
   
   The workaround to increase spark.sql.broadcastTimeout doesn't make sense and 
graceful, because the data to broadcast is very small.
   
   https://github.com/apache/spark/pull/30998 give a solution by sort the new 
stages by class type to make sure the calling of materialize() for 
BroadcastQueryState precede others. However, the solution is not perfect and 
because of the flaky of UT, it is revered. The order of calling materialize can 
guarantee that the order of task to be scheduled in normal circumstances, but, 
the guarantee is not strict since the submit of broadcast job and shuffle map 
job are in different thread.
   1. for broadcast job, call doPrepare() in main thread, and then start the 
real materialization in "broadcast-exchange-0" thread pool: calling 
getByteArrayRdd().collect() to submit collect job
   2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() 
which call sparkContext.submitMapStage() directly in main thread to submit map 
stage
   
   1 is trigger before 2, so in normal cases, the broadcast job will be submit 
first.
   However, we can not control how fast the two thread runs, so the 
"broadcast-exchange-0" thread could run a little bit slower than main thread, 
result in map stage submit first So there's still risk for the shuffle map job 
schedule earlier before broadcast job.
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   
   ### How was this patch tested?
   1. Add UT
   2. Test the code using dev environment in 
https://issues.apache.org/jira/browse/SPARK-33933


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to