Author: sandy Date: Mon Aug 5 17:12:58 2013 New Revision: 1510612 URL: http://svn.apache.org/r1510612 Log: MAPREDUCE-5367. Local jobs all use same local working directory (Sandy Ryza)
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1510612&r1=1510611&r2=1510612&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Aug 5 17:12:58 2013 @@ -74,6 +74,9 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5440. TestCopyCommitter Fails on JDK7 (Robert Parker via jlowe) + MAPREDUCE-5367. Local jobs all use same local working directory + (Sandy Ryza) + Release 2.1.0-beta - 2013-08-06 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1510612&r1=1510611&r2=1510612&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Aug 5 17:12:58 2013 @@ -146,7 +146,9 @@ public class LocalJobRunner implements C this.id = jobid; JobConf conf = new JobConf(systemJobFile); this.localFs = FileSystem.getLocal(conf); - this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir)); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + this.localJobDir = localFs.makeQualified(new Path( + new Path(conf.getLocalPath(jobDir), user), jobid.toString())); this.localJobFile = new Path(this.localJobDir, id + ".xml"); // Manage the distributed cache. If there are files to be copied, @@ -217,7 +219,7 @@ public class LocalJobRunner implements C info.getSplitIndex(), 1); map.setUser(UserGroupInformation.getCurrentUser(). getShortUserName()); - setupChildMapredLocalDirs(map, localConf); + setupChildMapredLocalDirs(localJobDir, map, localConf); MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); @@ -412,7 +414,7 @@ public class LocalJobRunner implements C getShortUserName()); JobConf localConf = new JobConf(job); localConf.set("mapreduce.jobtracker.address", "local"); - setupChildMapredLocalDirs(reduce, localConf); + setupChildMapredLocalDirs(localJobDir, reduce, localConf); // move map output to reduce input for (int i = 0; i < mapIds.size(); i++) { if (!this.isInterrupted()) { @@ -839,31 +841,27 @@ public class LocalJobRunner implements C throw new UnsupportedOperationException("Not supported"); } - static void setupChildMapredLocalDirs(Task t, JobConf conf) { + static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) { String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR); - String jobId = t.getJobID().toString(); String taskId = t.getTaskID().toString(); boolean isCleanup = t.isTaskCleanupTask(); - String user = t.getUser(); StringBuffer childMapredLocalDir = new StringBuffer(localDirs[0] + Path.SEPARATOR - + getLocalTaskDir(user, jobId, taskId, isCleanup)); + + getLocalTaskDir(localJobDir, taskId, isCleanup)); for (int i = 1; i < localDirs.length; i++) { childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR - + getLocalTaskDir(user, jobId, taskId, isCleanup)); + + getLocalTaskDir(localJobDir, taskId, isCleanup)); } LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir); conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString()); } static final String TASK_CLEANUP_SUFFIX = ".cleanup"; - static final String SUBDIR = jobDir; static final String JOBCACHE = "jobcache"; - static String getLocalTaskDir(String user, String jobid, String taskid, + static String getLocalTaskDir(Path localJobDir, String taskid, boolean isCleanupAttempt) { - String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE - + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid; + String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid; if (isCleanupAttempt) { taskDir = taskDir + TASK_CLEANUP_SUFFIX; }