Author: szetszwo Date: Tue Sep 3 19:01:19 2013 New Revision: 1519796 URL: http://svn.apache.org/r1519796 Log: Merge r1517887 through r1518850 from trunk.
Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1517887-1518850 Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt?rev=1519796&r1=1519795&r2=1519796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt Tue Sep 3 19:01:19 2013 @@ -243,6 +243,12 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. (Jian He via vinodkv) + MAPREDUCE-5483. revert MAPREDUCE-5357. (rkanter via tucu) + + MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM + commands to reboot, so that client can continue to track the overall job. + (Jian He via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1517887-1518850 Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1513717-1518850 Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1519796&r1=1519795&r2=1519796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Sep 3 19:01:19 2013 @@ -993,7 +993,7 @@ public class JobImpl implements org.apac } } - private static JobState getExternalState(JobStateInternal smState) { + private JobState getExternalState(JobStateInternal smState) { switch (smState) { case KILL_WAIT: case KILL_ABORT: @@ -1005,7 +1005,13 @@ public class JobImpl implements org.apac case FAIL_ABORT: return JobState.FAILED; case REBOOT: - return JobState.ERROR; + if (appContext.isLastAMRetry()) { + return JobState.ERROR; + } else { + // In case of not last retry, return the external state as RUNNING since + // otherwise JobClient will exit when it polls the AM for job state + return JobState.RUNNING; + } default: return JobState.valueOf(smState.name()); } Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1519796&r1=1519795&r2=1519796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Sep 3 19:01:19 2013 @@ -29,6 +29,7 @@ import java.util.Iterator; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -41,6 +42,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -51,12 +54,15 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Clock; import org.junit.Test; /** @@ -368,6 +374,47 @@ public class TestMRApp { app.waitForState(job, JobState.ERROR); } + @Test + public void testJobRebootNotLastRetry() throws Exception { + MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true); + Job job = app.submit(new Configuration()); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task task = it.next(); + app.waitForState(task, TaskState.RUNNING); + + //send an reboot event + app.getContext().getEventHandler().handle(new JobEvent(job.getID(), + JobEventType.JOB_AM_REBOOT)); + + // return exteranl state as RUNNING since otherwise the JobClient will + // prematurely exit. + app.waitForState(job, JobState.RUNNING); + } + + @Test + public void testJobRebootOnLastRetry() throws Exception { + // make startCount as 2 since this is last retry which equals to + // DEFAULT_MAX_AM_RETRY + MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2); + + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task task = it.next(); + app.waitForState(task, TaskState.RUNNING); + + //send an reboot event + app.getContext().getEventHandler().handle(new JobEvent(job.getID(), + JobEventType.JOB_AM_REBOOT)); + + // return exteranl state as ERROR if this is the last retry + app.waitForState(job, JobState.ERROR); + } + private final class MRAppWithSpiedJob extends MRApp { private JobImpl spiedJob; Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1519796&r1=1519795&r2=1519796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Sep 3 19:01:19 2013 @@ -142,7 +142,7 @@ public class TestJobImpl { "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ", "tag1,tag2"); dispatcher.register(EventType.class, jseHandler); - JobImpl job = createStubbedJob(conf, dispatcher, 0); + JobImpl job = createStubbedJob(conf, dispatcher, 0, null); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(job.getID())); @@ -170,7 +170,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); @@ -195,7 +195,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); @@ -239,7 +239,9 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2); + AppContext mockContext = mock(AppContext.class); + when(mockContext.isLastAMRetry()).thenReturn(false); + JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -248,6 +250,10 @@ public class TestJobImpl { job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); assertJobState(job, JobStateInternal.REBOOT); + // return the external state as RUNNING since otherwise JobClient will + // exit when it polls the AM for job state + Assert.assertEquals(JobState.RUNNING, job.getState()); + dispatcher.stop(); commitHandler.stop(); } @@ -256,6 +262,7 @@ public class TestJobImpl { public void testRebootedDuringCommit() throws Exception { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2); AsyncDispatcher dispatcher = new AsyncDispatcher(); dispatcher.init(conf); dispatcher.start(); @@ -266,13 +273,18 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + AppContext mockContext = mock(AppContext.class); + when(mockContext.isLastAMRetry()).thenReturn(true); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); syncBarrier.await(); job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT)); assertJobState(job, JobStateInternal.REBOOT); + // return the external state as FAILED since this is last retry. + Assert.assertEquals(JobState.ERROR, job.getState()); + dispatcher.stop(); commitHandler.stop(); } @@ -301,7 +313,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobImpl job = createStubbedJob(conf, dispatcher, 2, null); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -328,7 +340,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); completeJobTasks(job); assertJobState(job, JobStateInternal.COMMITTING); @@ -352,7 +364,7 @@ public class TestJobImpl { createCommitterEventHandler(dispatcher, committer); commitHandler.init(conf); commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null); //Fail one task. This should land the JobImpl in the FAIL_WAIT state job.handle(new JobTaskEvent( @@ -388,7 +400,7 @@ public class TestJobImpl { //Job has only 1 mapper task. No reducers conf.setInt(MRJobConfig.NUM_REDUCES, 0); conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 1); + JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null); //Fail / finish all the tasks. This should land the JobImpl directly in the //FAIL_ABORT state @@ -440,7 +452,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobImpl job = createStubbedJob(conf, dispatcher, 2, null); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -477,7 +489,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobImpl job = createStubbedJob(conf, dispatcher, 2, null); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -687,7 +699,7 @@ public class TestJobImpl { commitHandler.init(conf); commitHandler.start(); - JobImpl job = createStubbedJob(conf, dispatcher, 2); + JobImpl job = createStubbedJob(conf, dispatcher, 2, null); JobId jobId = job.getID(); job.handle(new JobEvent(jobId, JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); @@ -735,12 +747,12 @@ public class TestJobImpl { } private static StubbedJob createStubbedJob(Configuration conf, - Dispatcher dispatcher, int numSplits) { + Dispatcher dispatcher, int numSplits, AppContext appContext) { JobID jobID = JobID.forName("job_1234567890000_0001"); JobId jobId = TypeConverter.toYarn(jobID); StubbedJob job = new StubbedJob(jobId, ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0), - conf,dispatcher.getEventHandler(), true, "somebody", numSplits); + conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext); dispatcher.register(JobEventType.class, job); EventHandler mockHandler = mock(EventHandler.class); dispatcher.register(TaskEventType.class, mockHandler); @@ -751,8 +763,8 @@ public class TestJobImpl { } private static StubbedJob createRunningStubbedJob(Configuration conf, - Dispatcher dispatcher, int numSplits) { - StubbedJob job = createStubbedJob(conf, dispatcher, numSplits); + Dispatcher dispatcher, int numSplits, AppContext appContext) { + StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext); job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT)); assertJobState(job, JobStateInternal.INITED); job.handle(new JobStartEvent(job.getID())); @@ -880,13 +892,13 @@ public class TestJobImpl { } public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId, - Configuration conf, EventHandler eventHandler, - boolean newApiCommitter, String user, int numSplits) { + Configuration conf, EventHandler eventHandler, boolean newApiCommitter, + String user, int numSplits, AppContext appContext) { super(jobId, applicationAttemptId, conf, eventHandler, null, new JobTokenSecretManager(), new Credentials(), new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(), MRAppMetrics.create(), null, newApiCommitter, user, - System.currentTimeMillis(), null, null, null, null); + System.currentTimeMillis(), null, appContext, null, null); initTransition = getInitTransition(numSplits); localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW, Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1519796&r1=1519795&r2=1519796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Sep 3 19:01:19 2013 @@ -124,7 +124,6 @@ public class JobSubmissionFiles { } else { fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION)); - fs.setOwner(stagingArea, currentUser, null); } return stagingArea; } Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1513717-1518850