Author: shv Date: Sun Nov 20 22:02:26 2011 New Revision: 1204275 URL: http://svn.apache.org/viewvc?rev=1204275&view=rev Log: MAPREDUCE-2059. RecoveryManager excludes jobtracker.info from the list of jobs to be recovered. Contributed by Subroto Sanyal.
Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Modified: hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt?rev=1204275&r1=1204274&r2=1204275&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/mapreduce/CHANGES.txt Sun Nov 20 22:02:26 2011 @@ -641,6 +641,9 @@ Release 0.22.0 - Unreleased MAPREDUCE-3429. Few contrib tests are failing because of the missing commons-lang dependency (cos) + MAPREDUCE-2059. RecoveryManager excludes jobtracker.info from the list of + jobs to be recovered. (Subroto Sanyal via shv) + Release 0.21.1 - Unreleased NEW FEATURES Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1204275&r1=1204274&r2=1204275&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JobTracker.java Sun Nov 20 22:02:26 2011 @@ -65,6 +65,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.Text; @@ -1553,18 +1554,18 @@ public class JobTracker implements MRCon } } catch (FileNotFoundException fnf) {} //ignore // Make sure that the backup data is preserved - FileStatus[] systemDirData; + FileStatus[] jobDirData; try { - systemDirData = fs.listStatus(this.systemDir); + jobDirData = getJobFilesForRecovery(fs, systemDir); } catch (FileNotFoundException fnfe) { - systemDirData = null; + jobDirData = null; } // Check if the history is enabled .. as we can't have persistence with // history disabled if (conf.getBoolean(JT_RESTART_ENABLED, false) - && systemDirData != null) { - for (FileStatus status : systemDirData) { + && jobDirData != null) { + for (FileStatus status : jobDirData) { try { recoveryManager.addJobForRecovery(status); } catch (Throwable t) { @@ -1643,6 +1644,23 @@ public class JobTracker implements MRCon } /** + * Expects Recovery Manager to be initialized + */ + FileStatus[] getJobFilesForRecovery(FileSystem fs, Path jobFolderPath) + throws IOException { + return fs.listStatus(jobFolderPath, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith( + recoveryManager.getRestartCountFile().getName())) { + return false; + } + return true; + } + }); + } + + /** * Recursively delete the contents of a directory without deleting the * directory itself. */ Modified: hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1204275&r1=1204274&r2=1204275&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Sun Nov 20 22:02:26 2011 @@ -18,7 +18,8 @@ package org.apache.hadoop.mapred; -import java.io.File; +import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; + import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -28,15 +29,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.JobTracker.RecoveryManager; -import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.MRConfig; - -import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesConfigFile; -import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName; +import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.security.UserGroupInformation; @@ -85,8 +84,8 @@ public class TestRecoveryManager extends RunningJob rJob1 = (new JobClient(job1)).submitJob(job1); LOG.info("Submitted job " + rJob1.getID()); - while (rJob1.mapProgress() < 0.5f) { - LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done"); + while (rJob1.mapProgress() < 0.2f) { + LOG.info("Waiting for job " + rJob1.getID() + " to be 20% done"); UtilsForTests.waitFor(100); } @@ -100,8 +99,8 @@ public class TestRecoveryManager extends RunningJob rJob2 = (new JobClient(job2)).submitJob(job2); LOG.info("Submitted job " + rJob2.getID()); - while (rJob2.mapProgress() < 0.5f) { - LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done"); + while (rJob2.mapProgress() < 0.2f) { + LOG.info("Waiting for job " + rJob2.getID() + " to be 20% done"); UtilsForTests.waitFor(100); } @@ -146,7 +145,7 @@ public class TestRecoveryManager extends * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown * during recovery. It does the following : * - submits a job with HIGH priority and x tasks - * - allows it to complete 50% + * - allows it to complete 20% * - submits another job with normal priority and y tasks * - kills the jobtracker * - restarts the jobtracker with max-tasks-per-job such that @@ -181,8 +180,8 @@ public class TestRecoveryManager extends RunningJob rJob1 = jc.submitJob(job1); LOG.info("Submitted first job " + rJob1.getID()); - while (rJob1.mapProgress() < 0.5f) { - LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done"); + while (rJob1.mapProgress() < 0.2f) { + LOG.info("Waiting for job " + rJob1.getID() + " to be 20% done"); UtilsForTests.waitFor(100); } @@ -313,7 +312,7 @@ public class TestRecoveryManager extends assertFalse("Info file exists after update failure", fs.exists(restartFile)); assertFalse("Temporary restart-file exists after update failure", - fs.exists(restartFile)); + fs.exists(tmpRestartFile)); // start 1 data node dfs.startDataNodes(conf, 1, true, null, null, null, null); @@ -326,5 +325,46 @@ public class TestRecoveryManager extends failed = true; } assertFalse("JobTracker failed to create info files with datanodes!!!", failed); + dfs.shutdown(); + } + + public void testRestartCountFileWillNotBeListedForRecovery() throws Exception { + MiniDFSCluster dfsCluster = null; + MiniMRCluster mrCluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(JTConfig.JT_RESTART_ENABLED, true); + System + .setProperty("hadoop.log.dir", System.getProperty("java.io.tmpdir")); + dfsCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build(); + mrCluster = + new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(), 1); + JobTracker jobTracker = mrCluster.getJobTrackerRunner().getJobTracker(); + Path jobFolderPath = new Path(jobTracker.getSystemDir()); + FileSystem fileSystem = jobTracker.getFileSystem(); + Path restartCountFile = jobTracker.recoveryManager.getRestartCountFile(); + FSDataOutputStream restartFileStream = fileSystem + .create(restartCountFile); + restartFileStream.writeInt(0); + restartFileStream.close(); + Path tempRestartFile = jobTracker.recoveryManager + .getTempRestartCountFile(); + FSDataOutputStream tempRestartFileStream = fileSystem + .create(tempRestartFile); + tempRestartFileStream.writeInt(0); + tempRestartFileStream.close(); + assertTrue("Restart Count File doesn't exist", fileSystem + .exists(restartCountFile)); + assertTrue("Restart Count File doesn't exist", fileSystem + .exists(tempRestartFile)); + FileStatus[] jobFilesForRecovery = jobTracker.getJobFilesForRecovery( + fileSystem, jobFolderPath); + assertTrue("Restart Count File and Temp Restart count file exist.", + 0 == jobFilesForRecovery.length); + } finally { + if(mrCluster != null) mrCluster.shutdown(); + if(dfsCluster != null) dfsCluster.shutdown(); + } } }