yaooqinn opened a new pull request #25941: [SPARK-29257][Core][Shuffle] Use 
task attempt number as noop reduce id to handle disk failures during shuffle
URL: https://github.com/apache/spark/pull/25941
 
 
   …
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a 
faster review.
   -->
   
   ### What changes were proposed in this pull request?
   the noop reduce id used to be 0, which results a fixed index or data file 
name. When the nodes in your cluster has more than one disks, if one of them is 
broken and the fixed file name's hash code % disk number just point to it, all 
attempts of one task will inevitably access the broken disk, which lead to 
meaningless task failure or even tear down the whole job. Here we change the 
noop reduce id to task attempt number to produce different file name, which may 
try another healthy disk.
   
   
   ### Why are the changes needed?
   
   We have an HDFS/YARN cluster with about 2k~3k nodes, each node has 8T * 12 
local disks for storage and shuffle. Sometimes, one or more disks get into bad 
status during computations. Sometimes it does cause job level failure, 
sometimes does.
   
   The following picture shows one failure job caused by 4 task attempts were 
all delivered to the same node and failed with almost the same exception for 
writing the index temporary file to the same bad disk.
   This is caused by two reasons:
   
   1. As we can see in the figure the data and the node have the best data 
locality for reading from HDFS. As the default spark.locality.wait(3s) taking 
effect, there is a high probability that those attempts will be scheduled to 
this node.
   2. The index file or data file name for a particular shuffle map task is 
fixed. It is formed by the shuffle id, the map id and the noop reduce id which 
is always 0. The root local dir is picked by the fixed file name's non-negative 
hash code % the disk number. Thus, this value is also fixed.  Even when we have 
12 disks in total and only one of them is broken, if the broken one is once 
picked, all the following attempts of this task will inevitably pick the broken 
one.
   
   
![image](https://user-images.githubusercontent.com/8326978/65672636-f0610480-e07b-11e9-8939-1bc664ddaed7.png)
   
   ### Does this PR introduce any user-facing change?
   NO
   
   ### How was this patch tested?
   
   add some uts
   
   ping @cloud-fan @srowen @squito 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to