Author: jlowe Date: Thu Jul 25 19:38:49 2013 New Revision: 1507104 URL: http://svn.apache.org/r1507104 Log: MAPREDUCE-5251. Reducer should not implicate map attempt if it has insufficient space to fetch map output. Contributed by Ashwin Shankar
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1507104&r1=1507103&r2=1507104&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jul 25 19:38:49 2013 @@ -173,6 +173,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via jlowe) + MAPREDUCE-5251. Reducer should not implicate map attempt if it has + insufficient space to fetch map output (Ashwin Shankar via jlowe) + Release 2.1.1-beta - UNRELEASED INCOMPATIBLE CHANGES @@ -1237,6 +1240,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via jlowe) + MAPREDUCE-5251. Reducer should not implicate map attempt if it has + insufficient space to fetch map output (Ashwin Shankar via jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1507104&r1=1507103&r2=1507104&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu Jul 25 19:38:49 2013 @@ -407,7 +407,14 @@ class Fetcher<K,V> extends Thread { } // Get the location for the map output - either in-memory or on-disk - mapOutput = merger.reserve(mapId, decompressedLength, id); + try { + mapOutput = merger.reserve(mapId, decompressedLength, id); + } catch (IOException ioe) { + // kill this reduce attempt + ioErrs.increment(1); + scheduler.reportLocalError(ioe); + return EMPTY_ATTEMPT_ID_ARRAY; + } // Check if we can shuffle *now* ... if (mapOutput == null) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1507104&r1=1507103&r2=1507104&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Thu Jul 25 19:38:49 2013 @@ -19,7 +19,9 @@ package org.apache.hadoop.mapreduce.task import java.io.IOException; +import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.HashMap; @@ -252,6 +254,16 @@ public class ShuffleSchedulerImpl<K,V> i failedShuffleCounter.increment(1); } + + public void reportLocalError(IOException ioe) { + try { + LOG.error("Shuffle failed : local error on this node: " + + InetAddress.getLocalHost()); + } catch (UnknownHostException e) { + LOG.error("Shuffle failed : local error on this node"); + } + reporter.reportException(ioe); + } // Notify the JobTracker // after every read error, if 'reportReadErrorImmediately' is true or Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1507104&r1=1507103&r2=1507104&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Thu Jul 25 19:38:49 2013 @@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -114,6 +115,36 @@ public class TestFetcher { LOG.info("<<<< " + name.getMethodName()); } + @Test + public void testReduceOutOfDiskSpace() throws Throwable { + LOG.info("testReduceOutOfDiskSpace"); + + Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, + r, metrics, except, key, connection); + + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + when(connection.getInputStream()).thenReturn(in); + + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) + .thenThrow(new DiskErrorException("No disk space available")); + + underTest.copyFromHost(host); + verify(ss).reportLocalError(any(IOException.class)); + } + @Test(timeout=30000) public void testCopyFromHostConnectionTimeout() throws Exception { when(connection.getInputStream()).thenThrow(