maryannxue commented on issue #26633: [SPARK-29994][CORE] Add WILDCARD task location URL: https://github.com/apache/spark/pull/26633#issuecomment-558804630 Changing the default locality wait time to 0 (or whatever it is) is based on the assumption that all workloads do not have serious penalty from a locality miss, coz we are looking at shuffles only. There can be exceptions where locality does matter a lot and it would be worth some wait time. Back to the local shuffle reader. I'll explain from the very beginning what it does. **Background**: In Adaptive Query Execution, we may decide to change a sort-merge-join to broadcast-hash-join in the middle of query execution. A sort-merge-join usually (not always) comes with both sides topped by a shuffle and a sort. Once the shuffle is materialized (the map stage is done), we come to a natural sync point where we can try re-planning based on the shuffle stats and data metrics we've got so far. So if one side is small enough to be broadcast, we can change the SMJ into a BHJ, but note that the shuffles underneath have either been completed or been started at least. **LocalShuffleReader**: We can't save the materialization (map stage) cost apparently, but what we can do is avoid the network cost of the reduce stage of a shuffle, because a BHJ does not care about the shuffle partitioning at all. The LocalShuffledRowRDD alters the behavior of a shuffle reduce stage, by just fetching data from a single mapper and setting the preferred location as where the mapper output is. Note that there can be multiple LocalShuffledRowRDD partitions for each mapper, which means a LocalShuffledRowRDD partition can fetch all the output or part of the output from a single mapper. **Problem**: The number of shuffle mappers is decided by the number of partitions of the upstream operator. The number of mappers can sometimes be small, esp. when the upstream operator is a file scan. If it's smaller than the number of worker nodes (we are talking about "hosts" here and it doesn't matter much how many executors per host, coz the locality specifies hosts only), the tasks will all be scheduled on those hosts but not the others. For example, we have 10 worker nodes, each having 4 executors, and then we have one of the join tables has 5 partitions after scan and default shuffle partition number is 200. So after shuffle map stage, we'll get 5 mappers, each having 200 shuffle blocks. In the reduce stage, if using a regular ShuffledRowRDD, we'll get 200 shuffle reduce tasks, and assume there's no skew, the 200 tasks will have no location preference and be evenly scheduled on the 4 * 10 = 40 executors. And if otherwise, we use the LocalShuffledRowReader, in order to match parallelism, we'd like to have 200 output partitions too, which means there will be 200 / 5 = 40 tasks for fetching data from each mapper. Yet, because of the locality preference, all these 200 tasks will be scheduled on 5 hosts only, while the other 5 hosts will not share the workload.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
