Author: tgraves Date: Fri Jul 20 20:21:49 2012 New Revision: 1363937 URL: http://svn.apache.org/viewvc?rev=1363937&view=rev Log: svn merge --change -1363456 for reverting MAPREDUCE-4423
Removed: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1363937&r1=1363936&r2=1363937&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 20 20:21:49 2012 @@ -343,9 +343,6 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't init. (Jason Lowe via daryn) - MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans - via tgraves) - Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1363937&r1=1363936&r2=1363937&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 20 20:21:49 2012 @@ -49,8 +49,7 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -import com.google.common.annotations.VisibleForTesting; - +@SuppressWarnings({"deprecation"}) class Fetcher<K,V> extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -176,18 +175,13 @@ class Fetcher<K,V> extends Thread { } } - @VisibleForTesting - protected HttpURLConnection openConnection(URL url) throws IOException { - return (HttpURLConnection)url.openConnection(); - } - /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - protected void copyFromHost(MapHost host) throws IOException { + private void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List<TaskAttemptID> maps = scheduler.getMapsForHost(host); @@ -197,11 +191,9 @@ class Fetcher<K,V> extends Thread { return; } - if(LOG.isDebugEnabled()) { - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); - } + LOG.debug("Fetcher " + id + " going to fetch from " + host); + for (TaskAttemptID tmp: maps) { + LOG.debug(tmp); } // List of maps to be fetched yet @@ -213,7 +205,7 @@ class Fetcher<K,V> extends Thread { try { URL url = getMapOutputURL(host, maps); - HttpURLConnection connection = openConnection(url); + HttpURLConnection connection = (HttpURLConnection)url.openConnection(); // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); @@ -274,24 +266,17 @@ class Fetcher<K,V> extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, faildTasks is not null and we exit - // after putting back the remaining maps to the - // yet_to_be_fetched list and marking the failed tasks. - TaskAttemptID[] failedTasks = null; - while (!remaining.isEmpty() && failedTasks == null) { - failedTasks = copyMapOutput(host, input, remaining); - } - - if(failedTasks != null) { - for(TaskAttemptID left: failedTasks) { - scheduler.copyFailed(left, host, true); - } + // On any error, good becomes false and we exit after putting back + // the remaining maps to the yet_to_be_fetched list + boolean good = true; + while (!remaining.isEmpty() && good) { + good = copyMapOutput(host, input, remaining); } IOUtils.cleanup(LOG, input); // Sanity check - if (failedTasks == null && !remaining.isEmpty()) { + if (good && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -300,9 +285,10 @@ class Fetcher<K,V> extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - } + + } - private TaskAttemptID[] copyMapOutput(MapHost host, + private boolean copyMapOutput(MapHost host, DataInputStream input, Set<TaskAttemptID> remaining) { MapOutput<K,V> mapOutput = null; @@ -324,15 +310,14 @@ class Fetcher<K,V> extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - //Don't know which one was bad, so consider all of them as bad - return remaining.toArray(new TaskAttemptID[remaining.size()]); + return false; } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return new TaskAttemptID[] {mapId}; + return false; } LOG.debug("header: " + mapId + ", len: " + compressedLength + @@ -344,7 +329,7 @@ class Fetcher<K,V> extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return new TaskAttemptID[] {mapId}; + return false; } // Go! @@ -366,18 +351,14 @@ class Fetcher<K,V> extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return null; + return true; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - if(mapId == null) { - return remaining.toArray(new TaskAttemptID[remaining.size()]); - } else { - return new TaskAttemptID[] {mapId}; - } + return false; } LOG.info("Failed to shuffle output of " + mapId + @@ -385,8 +366,9 @@ class Fetcher<K,V> extends Thread { // Inform the shuffle-scheduler mapOutput.abort(); + scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return new TaskAttemptID[] {mapId}; + return false; } }