Author: tgraves Date: Fri Jul 27 01:48:54 2012 New Revision: 1366260 URL: http://svn.apache.org/viewvc?rev=1366260&view=rev Log: merge -r 1366257:1366258 from trunk. FIXES: MAPREDUCE-4423
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java - copied unchanged from r1366258, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1366260&r1=1366259&r2=1366260&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Jul 27 01:48:54 2012 @@ -633,6 +633,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4467. IndexCache failures due to missing synchronization (Kihwal Lee via tgraves) + MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans + via tgraves) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1366260&r1=1366259&r2=1366260&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 27 01:48:54 2012 @@ -21,11 +21,12 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.HttpURLConnection; import java.net.URLConnection; import java.security.GeneralSecurityException; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -53,7 +54,8 @@ import org.apache.hadoop.mapreduce.task. import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; -@SuppressWarnings({"deprecation"}) +import com.google.common.annotations.VisibleForTesting; + class Fetcher<K,V> extends Thread { private static final Log LOG = LogFactory.getLog(Fetcher.class); @@ -199,6 +201,7 @@ class Fetcher<K,V> extends Thread { } } + @VisibleForTesting protected HttpURLConnection openConnection(URL url) throws IOException { HttpURLConnection conn = (HttpURLConnection) url.openConnection(); if (sslShuffle) { @@ -209,17 +212,18 @@ class Fetcher<K,V> extends Thread { throw new IOException(ex); } httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier()); - } - return conn; } - + return conn; + } + /** * The crux of the matter... * * @param host {@link MapHost} from which we need to * shuffle available map-outputs. */ - private void copyFromHost(MapHost host) throws IOException { + @VisibleForTesting + protected void copyFromHost(MapHost host) throws IOException { // Get completed maps on 'host' List<TaskAttemptID> maps = scheduler.getMapsForHost(host); @@ -229,9 +233,9 @@ class Fetcher<K,V> extends Thread { return; } - LOG.debug("Fetcher " + id + " going to fetch from " + host); - for (TaskAttemptID tmp: maps) { - LOG.debug(tmp); + if(LOG.isDebugEnabled()) { + LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: " + + maps); } // List of maps to be fetched yet @@ -304,17 +308,25 @@ class Fetcher<K,V> extends Thread { try { // Loop through available map-outputs and fetch them - // On any error, good becomes false and we exit after putting back - // the remaining maps to the yet_to_be_fetched list - boolean good = true; - while (!remaining.isEmpty() && good) { - good = copyMapOutput(host, input, remaining); + // On any error, faildTasks is not null and we exit + // after putting back the remaining maps to the + // yet_to_be_fetched list and marking the failed tasks. + TaskAttemptID[] failedTasks = null; + while (!remaining.isEmpty() && failedTasks == null) { + failedTasks = copyMapOutput(host, input, remaining); + } + + if(failedTasks != null && failedTasks.length > 0) { + LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks)); + for(TaskAttemptID left: failedTasks) { + scheduler.copyFailed(left, host, true); + } } IOUtils.cleanup(LOG, input); // Sanity check - if (good && !remaining.isEmpty()) { + if (failedTasks == null && !remaining.isEmpty()) { throw new IOException("server didn't return all expected map outputs: " + remaining.size() + " left."); } @@ -323,10 +335,11 @@ class Fetcher<K,V> extends Thread { scheduler.putBackKnownMapOutput(host, left); } } - - } + } - private boolean copyMapOutput(MapHost host, + private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0]; + + private TaskAttemptID[] copyMapOutput(MapHost host, DataInputStream input, Set<TaskAttemptID> remaining) { MapOutput<K,V> mapOutput = null; @@ -348,18 +361,21 @@ class Fetcher<K,V> extends Thread { } catch (IllegalArgumentException e) { badIdErrs.increment(1); LOG.warn("Invalid map id ", e); - return false; + //Don't know which one was bad, so consider all of them as bad + return remaining.toArray(new TaskAttemptID[remaining.size()]); } // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { - return false; + return new TaskAttemptID[] {mapId}; } - LOG.debug("header: " + mapId + ", len: " + compressedLength + - ", decomp len: " + decompressedLength); + if(LOG.isDebugEnabled()) { + LOG.debug("header: " + mapId + ", len: " + compressedLength + + ", decomp len: " + decompressedLength); + } // Get the location for the map output - either in-memory or on-disk mapOutput = merger.reserve(mapId, decompressedLength, id); @@ -367,7 +383,8 @@ class Fetcher<K,V> extends Thread { // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ..."); - return false; + //Not an error but wait to process data. + return EMPTY_ATTEMPT_ID_ARRAY; } // Go! @@ -389,24 +406,27 @@ class Fetcher<K,V> extends Thread { // Note successful shuffle remaining.remove(mapId); metrics.successFetch(); - return true; + return null; } catch (IOException ioe) { ioErrs.increment(1); if (mapId == null || mapOutput == null) { LOG.info("fetcher#" + id + " failed to read map header" + mapId + " decomp: " + decompressedLength + ", " + compressedLength, ioe); - return false; + if(mapId == null) { + return remaining.toArray(new TaskAttemptID[remaining.size()]); + } else { + return new TaskAttemptID[] {mapId}; + } } - LOG.info("Failed to shuffle output of " + mapId + + LOG.warn("Failed to shuffle output of " + mapId + " from " + host.getHostName(), ioe); // Inform the shuffle-scheduler mapOutput.abort(); - scheduler.copyFailed(mapId, host, true); metrics.failedFetch(); - return false; + return new TaskAttemptID[] {mapId}; } }