Author: jlowe Date: Tue Jul 16 15:13:51 2013 New Revision: 1503751 URL: http://svn.apache.org/r1503751 Log: Revert change 1503506 for MAPREDUCE-5317. Stale files left behind for failed jobs
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1503751&r1=1503750&r2=1503751&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Jul 16 15:13:51 2013 @@ -15,9 +15,6 @@ Release 0.23.10 - UNRELEASED MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir (Devaraj K via jlowe) - MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via - jlowe) - Release 0.23.9 - 2013-07-08 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1503751&r1=1503750&r2=1503751&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Tue Jul 16 15:13:51 2013 @@ -25,7 +25,6 @@ public enum JobStateInternal { RUNNING, COMMITTING, SUCCEEDED, - FAIL_WAIT, FAIL_ABORT, FAILED, KILL_WAIT, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1503751&r1=1503750&r2=1503751&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Jul 16 15:13:51 2013 @@ -44,7 +44,6 @@ public enum JobEventType { //Producer:Job JOB_COMPLETED, - JOB_FAIL_WAIT_TIMEDOUT, //Producer:Any component JOB_DIAGNOSTIC_UPDATE, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1503751&r1=1503750&r2=1503751&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jul 16 15:13:51 2013 @@ -29,9 +29,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -283,8 +280,7 @@ public class JobImpl implements org.apac .addTransition (JobStateInternal.RUNNING, EnumSet.of(JobStateInternal.RUNNING, - JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_ABORT), + JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT), JobEventType.JOB_TASK_COMPLETED, new TaskCompletedTransition()) .addTransition @@ -379,35 +375,7 @@ public class JobImpl implements org.apac EnumSet.of(JobEventType.JOB_KILL, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) - // Transitions from FAIL_WAIT state - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - JobEventType.JOB_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) - .addTransition(JobStateInternal.FAIL_WAIT, - EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT), - JobEventType.JOB_TASK_COMPLETED, - new JobFailWaitTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, - new JobFailWaitTimedOutTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED, - JobEventType.JOB_KILL, - new KilledDuringAbortTransition()) - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able events - .addTransition(JobStateInternal.FAIL_WAIT, - JobStateInternal.FAIL_WAIT, - EnumSet.of(JobEventType.JOB_TASK_ATTEMPT_COMPLETED, - JobEventType.JOB_MAP_TASK_RESCHEDULED, - JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE)) - - //Transitions from FAIL_ABORT state + // Transitions from FAIL_ABORT state .addTransition(JobStateInternal.FAIL_ABORT, JobStateInternal.FAIL_ABORT, JobEventType.JOB_DIAGNOSTIC_UPDATE, @@ -553,10 +521,6 @@ public class JobImpl implements org.apac private JobStateInternal forcedState = null; - //Executor used for running future tasks. Setting thread pool size to 1 - private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - private ScheduledFuture failWaitTriggerScheduledFuture; - public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, @@ -917,7 +881,6 @@ public class JobImpl implements org.apac case SETUP: case COMMITTING: return JobState.RUNNING; - case FAIL_WAIT: case FAIL_ABORT: return JobState.FAILED; default: @@ -1442,43 +1405,7 @@ public class JobImpl implements org.apac job.unsuccessfulFinish(finalState); } } - - //This transition happens when a job is to be failed. It waits for all the - //tasks to finish / be killed. - private static class JobFailWaitTransition - implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> { - @Override - public JobStateInternal transition(JobImpl job, JobEvent event) { - if(!job.failWaitTriggerScheduledFuture.isCancelled()) { - for(Task task: job.tasks.values()) { - if(!task.isFinished()) { - return JobStateInternal.FAIL_WAIT; - } - } - } - //Finished waiting. All tasks finished / were killed - job.failWaitTriggerScheduledFuture.cancel(false); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - return JobStateInternal.FAIL_ABORT; - } - } - - //This transition happens when a job to be failed times out while waiting on - //tasks that had been sent the KILL signal. It is triggered by a - //ScheduledFuture task queued in the executor. - private static class JobFailWaitTimedOutTransition - implements SingleArcTransition<JobImpl, JobEvent> { - @Override - public void transition(JobImpl job, JobEvent event) { - LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed." - + " Going to fail job anyway"); - job.failWaitTriggerScheduledFuture.cancel(false); - job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, - job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); - } - } - + // JobFinishedEvent triggers the move of the history file out of the staging // area. May need to create a new event type for this if JobFinished should // not be generated for KilledJobs, etc. @@ -1689,23 +1616,6 @@ public class JobImpl implements org.apac return checkJobAfterTaskCompletion(job); } - //This class is used to queue a ScheduledFuture to send an event to a job - //after some delay. This can be used to wait for maximum amount of time - //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for - //all tasks to be killed. - static class TriggerScheduledFuture implements Runnable { - JobEvent toSend; - JobImpl job; - TriggerScheduledFuture(JobImpl job, JobEvent toSend) { - this.toSend = toSend; - this.job = job; - } - public void run() { - LOG.info("Sending event " + toSend + " to " + job.getID()); - job.getEventHandler().handle(toSend); - } - } - protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) { //check for Job failure if (job.failedMapTaskCount*100 > @@ -1719,30 +1629,10 @@ public class JobImpl implements org.apac " failedReduces:" + job.failedReduceTaskCount; LOG.info(diagnosticMsg); job.addDiagnostic(diagnosticMsg); - - //Send kill signal to all unfinished tasks here. - boolean allDone = true; - for (Task task : job.tasks.values()) { - if(!task.isFinished()) { - allDone = false; - job.eventHandler.handle( - new TaskEvent(task.getID(), TaskEventType.T_KILL)); - } - } - - //If all tasks are already done, we should go directly to FAIL_ABORT - if(allDone) { - return JobStateInternal.FAIL_ABORT; - } - - //Set max timeout to wait for the tasks to get killed - job.failWaitTriggerScheduledFuture = job.executor.schedule( - new TriggerScheduledFuture(job, new JobEvent(job.getID(), - JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt( - MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, - MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS), - TimeUnit.MILLISECONDS); - return JobStateInternal.FAIL_WAIT; + job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId, + job.jobContext, + org.apache.hadoop.mapreduce.JobStatus.State.FAILED)); + return JobStateInternal.FAIL_ABORT; } return job.checkReadyForCommit(); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1503751&r1=1503750&r2=1503751&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jul 16 15:13:51 2013 @@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -80,7 +79,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.Mockito; /** @@ -241,39 +239,6 @@ public class TestJobImpl { commitHandler.stop(); } - @Test - public void testAbortJobCalledAfterKillingTasks() throws IOException, - InterruptedException { - Configuration conf = new Configuration(); - conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); - conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000"); - InlineDispatcher dispatcher = new InlineDispatcher(); - dispatcher.init(conf); - dispatcher.start(); - OutputCommitter committer = Mockito.mock(OutputCommitter.class); - CommitterEventHandler commitHandler = - createCommitterEventHandler(dispatcher, committer); - commitHandler.init(conf); - commitHandler.start(); - JobImpl job = createRunningStubbedJob(conf, dispatcher, 2); - - //Fail one task. This should land the JobImpl in the FAIL_WAIT state - job.handle(new JobTaskEvent( - MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP), - TaskState.FAILED)); - //Verify abort job hasn't been called - Mockito.verify(committer, Mockito.never()) - .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); - assertJobState(job, JobStateInternal.FAIL_WAIT); - - //Verify abortJob is called once and the job failed - Mockito.verify(committer, Mockito.timeout(2000).times(1)) - .abortJob((JobContext) Mockito.any(), (State) Mockito.any()); - assertJobState(job, JobStateInternal.FAILED); - - dispatcher.stop(); - } - @Test(timeout=20000) public void testKilledDuringFailAbort() throws Exception { Configuration conf = new Configuration();