Author: jlowe Date: Fri Jul 26 17:58:07 2013 New Revision: 1507384 URL: http://svn.apache.org/r1507384 Log: MAPREDUCE-5251. Reducer should not implicate map attempt if it has insufficient space to fetch map output. Contributed by Ashwin Shankar
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507384&r1=1507383&r2=1507384&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 17:58:07 2013 @@ -21,6 +21,9 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via jlowe) + MAPREDUCE-5251. Reducer should not implicate map attempt if it has + insufficient space to fetch map output (Ashwin Shankar via jlowe) + Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1507384&r1=1507383&r2=1507384&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 26 17:58:07 2013 @@ -338,7 +338,14 @@ class Fetcher<K,V> extends Thread { } // Get the location for the map output - either in-memory or on-disk - mapOutput = merger.reserve(mapId, decompressedLength, id); + try { + mapOutput = merger.reserve(mapId, decompressedLength, id); + } catch (IOException ioe) { + // kill this reduce attempt + ioErrs.increment(1); + scheduler.reportLocalError(ioe); + return EMPTY_ATTEMPT_ID_ARRAY; + } // Check if we can shuffle *now* ... if (mapOutput.getType() == Type.WAIT) { Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java?rev=1507384&r1=1507383&r2=1507384&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java Fri Jul 26 17:58:07 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.HashMap; @@ -301,6 +303,16 @@ class ShuffleScheduler<K,V> { host.addKnownMap(mapId); } + public void reportLocalError(IOException ioe) { + try { + LOG.error("Shuffle failed : local error on this node: " + + InetAddress.getLocalHost()); + } catch (UnknownHostException e) { + LOG.error("Shuffle failed : local error on this node"); + } + reporter.reportException(ioe); + } + public synchronized MapHost getHost() throws InterruptedException { while(pendingHosts.isEmpty()) { wait(); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1507384&r1=1507383&r2=1507384&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Fri Jul 26 17:58:07 2013 @@ -42,6 +42,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.junit.Test; /** @@ -73,6 +74,56 @@ public class TestFetcher { } @SuppressWarnings("unchecked") + @Test + public void testReduceOutOfDiskSpace() throws Throwable { + LOG.info("testReduceOutOfDiskSpace"); + JobConf job = new JobConf(); + TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1"); + ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class); + MergeManager<Text, Text> mm = mock(MergeManager.class); + Reporter r = mock(Reporter.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); + ExceptionReporter except = mock(ExceptionReporter.class); + SecretKey key = JobTokenSecretManager.createSecretKey(new byte[] { 0, 0, 0, + 0 }); + HttpURLConnection connection = mock(HttpURLConnection.class); + + Counters.Counter allErrs = mock(Counters.Counter.class); + when(r.getCounter(anyString(), anyString())).thenReturn(allErrs); + + Fetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(job, id, ss, + mm, r, metrics, except, key, connection); + + MapHost host = new MapHost("localhost", "http://localhost:8080/"); + ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1); + TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + maps.add(map1ID); + TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1"); + maps.add(map2ID); + String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg="; + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + header.write(new DataOutputStream(bout)); + + ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); + + when(ss.getMapsForHost(host)).thenReturn(maps); + when(connection.getResponseCode()).thenReturn(200); + when( + connection + .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) + .thenReturn(replyHash); + when(connection.getInputStream()).thenReturn(in); + + when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())).thenThrow( + new DiskErrorException("No disk space available")); + + underTest.copyFromHost(host); + verify(ss).reportLocalError(any(IOException.class)); + } + + @SuppressWarnings("unchecked") @Test(timeout=30000) public void testCopyFromHostConnectionTimeout() throws Exception { LOG.info("testCopyFromHostConnectionTimeout");