Author: kasha Date: Wed Jul 16 21:33:30 2014 New Revision: 1611199 URL: http://svn.apache.org/r1611199 Log: MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
Modified: hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Modified: hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/CHANGES.txt?rev=1611199&r1=1611198&r2=1611199&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/CHANGES.txt Wed Jul 16 21:33:30 2014 @@ -144,6 +144,9 @@ Release 2.5.0 - UNRELEASED resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via vinodkv) + MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly + assumes a single dir for mapOutIndex. (Gera Shegalov via kasha) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1611199&r1=1611198&r2=1611199&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Wed Jul 16 21:33:30 2014 @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSError; @@ -438,43 +439,6 @@ public class LocalContainerLauncher exte } /** - * Within the _local_ filesystem (not HDFS), all activity takes place within - * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/), - * and all sub-MapTasks create the same filename ("file.out"). Rename that - * to something unique (e.g., "map_0.out") to avoid collisions. - * - * Longer-term, we'll modify [something] to use TaskAttemptID-based - * filenames instead of "file.out". (All of this is entirely internal, - * so there are no particular compatibility issues.) - */ - private MapOutputFile renameMapOutputForReduce(JobConf conf, - TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { - FileSystem localFs = FileSystem.getLocal(conf); - // move map output to reduce input - Path mapOut = subMapOutputFile.getOutputFile(); - FileStatus mStatus = localFs.getFileStatus(mapOut); - Path reduceIn = subMapOutputFile.getInputFileForWrite( - TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); - Path mapOutIndex = new Path(mapOut.toString() + ".index"); - Path reduceInIndex = new Path(reduceIn.toString() + ".index"); - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming map output file for task attempt " - + mapId.toString() + " from original location " + mapOut.toString() - + " to destination " + reduceIn.toString()); - } - if (!localFs.mkdirs(reduceIn.getParent())) { - throw new IOException("Mkdirs failed to create " - + reduceIn.getParent().toString()); - } - if (!localFs.rename(mapOut, reduceIn)) - throw new IOException("Couldn't rename " + mapOut); - if (!localFs.rename(mapOutIndex, reduceInIndex)) - throw new IOException("Couldn't rename " + mapOutIndex); - - return new RenamedMapOutputFile(reduceIn); - } - - /** * Also within the local filesystem, we need to restore the initial state * of the directory as much as possible. Compare current contents against * the saved original state and nuke everything that doesn't belong, with @@ -506,7 +470,46 @@ public class LocalContainerLauncher exte } } // end EventHandler - + + /** + * Within the _local_ filesystem (not HDFS), all activity takes place within + * a subdir inside one of the LOCAL_DIRS + * (${local.dir}/usercache/$user/appcache/$appId/$contId/), + * and all sub-MapTasks create the same filename ("file.out"). Rename that + * to something unique (e.g., "map_0.out") to avoid possible collisions. + * + * Longer-term, we'll modify [something] to use TaskAttemptID-based + * filenames instead of "file.out". (All of this is entirely internal, + * so there are no particular compatibility issues.) + */ + @VisibleForTesting + protected static MapOutputFile renameMapOutputForReduce(JobConf conf, + TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + // move map output to reduce input + Path mapOut = subMapOutputFile.getOutputFile(); + FileStatus mStatus = localFs.getFileStatus(mapOut); + Path reduceIn = subMapOutputFile.getInputFileForWrite( + TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen()); + Path mapOutIndex = subMapOutputFile.getOutputIndexFile(); + Path reduceInIndex = new Path(reduceIn.toString() + ".index"); + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming map output file for task attempt " + + mapId.toString() + " from original location " + mapOut.toString() + + " to destination " + reduceIn.toString()); + } + if (!localFs.mkdirs(reduceIn.getParent())) { + throw new IOException("Mkdirs failed to create " + + reduceIn.getParent().toString()); + } + if (!localFs.rename(mapOut, reduceIn)) + throw new IOException("Couldn't rename " + mapOut); + if (!localFs.rename(mapOutIndex, reduceInIndex)) + throw new IOException("Couldn't rename " + mapOutIndex); + + return new RenamedMapOutputFile(reduceIn); + } + private static class RenamedMapOutputFile extends MapOutputFile { private Path path; Modified: hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java?rev=1611199&r1=1611198&r2=1611199&view=diff ============================================================================== --- hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java (original) +++ hadoop/common/branches/branch-2.5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestLocalContainerLauncher.java Wed Jul 16 21:33:30 2014 @@ -18,17 +18,26 @@ package org.apache.hadoop.mapred; +import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -46,6 +55,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -53,6 +65,36 @@ import org.mockito.stubbing.Answer; public class TestLocalContainerLauncher { private static final Log LOG = LogFactory.getLog(TestLocalContainerLauncher.class); + private static File testWorkDir; + private static final String[] localDirs = new String[2]; + + private static void delete(File dir) throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + Path p = fs.makeQualified(new Path(dir.getAbsolutePath())); + fs.delete(p, true); + } + + @BeforeClass + public static void setupTestDirs() throws IOException { + testWorkDir = new File("target", + TestLocalContainerLauncher.class.getCanonicalName()); + testWorkDir.delete(); + testWorkDir.mkdirs(); + testWorkDir = testWorkDir.getAbsoluteFile(); + for (int i = 0; i < localDirs.length; i++) { + final File dir = new File(testWorkDir, "local-" + i); + dir.mkdirs(); + localDirs[i] = dir.toString(); + } + } + + @AfterClass + public static void cleanupTestDirs() throws IOException { + if (testWorkDir != null) { + delete(testWorkDir); + } + } @SuppressWarnings("rawtypes") @Test(timeout=10000) @@ -141,4 +183,35 @@ public class TestLocalContainerLauncher when(container.getNodeId()).thenReturn(nodeId); return container; } + + + @Test + public void testRenameMapOutputForReduce() throws Exception { + final JobConf conf = new JobConf(); + + final MROutputFiles mrOutputFiles = new MROutputFiles(); + mrOutputFiles.setConf(conf); + + // make sure both dirs are distinct + // + conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString()); + final Path mapOut = mrOutputFiles.getOutputFileForWrite(1); + conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString()); + final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1); + Assert.assertNotEquals("Paths must be different!", + mapOut.getParent(), mapOutIdx.getParent()); + + // make both dirs part of LOCAL_DIR + conf.setStrings(MRConfig.LOCAL_DIR, localDirs); + + final FileContext lfc = FileContext.getLocalFSFileContext(conf); + lfc.create(mapOut, EnumSet.of(CREATE)).close(); + lfc.create(mapOutIdx, EnumSet.of(CREATE)).close(); + + final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2); + final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); + final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0); + + LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles); + } }