maryannxue commented on issue #26633: [SPARK-29994][CORE] Add WILDCARD task 
location
URL: https://github.com/apache/spark/pull/26633#issuecomment-558804630
 
 
   Changing the default locality wait time to 0 (or whatever it is) is based on 
the assumption that all workloads do not have serious penalty from a locality 
miss, coz we are looking at shuffles only. There can be exceptions where 
locality does matter a lot and it would be worth some wait time.
   
   Back to the local shuffle reader. I'll explain from the very beginning what 
it does.
   **Background**: In Adaptive Query Execution, we may decide to change a 
sort-merge-join to broadcast-hash-join in the middle of query execution. A 
sort-merge-join usually (not always) comes with both sides topped by a shuffle 
and a sort. Once the shuffle is materialized (the map stage is done), we come 
to a natural sync point where we can try re-planning based on the shuffle stats 
and data metrics we've got so far. So if one side is small enough to be 
broadcast, we can change the SMJ into a BHJ, but note that the shuffles 
underneath have either been completed or been started at least.
   **LocalShuffleReader**:  We can't save the materialization (map stage) cost 
apparently, but what we can do is avoid the network cost of the reduce stage of 
a shuffle, because a BHJ does not care about the shuffle partitioning at all. 
The LocalShuffledRowRDD alters the behavior of a shuffle reduce stage, by just 
fetching data from a single mapper and setting the preferred location as where 
the mapper output is. Note that there can be multiple LocalShuffledRowRDD 
partitions for each mapper, which means a LocalShuffledRowRDD partition can 
fetch all the output or part of the output from a single mapper.
   **Problem**: The number of shuffle mappers is decided by the number of 
partitions of the upstream operator. The number of mappers can sometimes be 
small, esp. when the upstream operator is a file scan. If it's smaller than the 
number of worker nodes (we are talking about "hosts" here and it doesn't matter 
much how many executors per host, coz the locality specifies hosts only), the 
tasks will all be scheduled on those hosts but not the others. For example, we 
have 10 worker nodes, each having 4 executors, and then we have one of the join 
tables has 5 partitions after scan and default shuffle partition number is 200. 
So after shuffle map stage, we'll get 5 mappers, each having 200 shuffle 
blocks. In the reduce stage, if using a regular ShuffledRowRDD, we'll get 200 
shuffle reduce tasks, and assume there's no skew, the 200 tasks will have no 
location preference and be evenly scheduled on the 4 * 10 = 40 executors. And 
if otherwise, we use the LocalShuffledRowReader, in order to match parallelism, 
we'd like to have 200 output partitions too, which means there will be 200 / 5 
= 40 tasks for fetching data from each mapper. Yet, because of the locality 
preference, all these 200 tasks will be scheduled on 5 hosts only, while the 
other 5 hosts will not share the workload.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to