Author: szetszwo Date: Tue Nov 13 20:21:39 2012 New Revision: 1408938 URL: http://svn.apache.org/viewvc?rev=1408938&view=rev Log: Merge r1407704 through r1408926 from trunk.
Added: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/ - copied from r1408926, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/ - copied from r1408926, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/site.css - copied unchanged from r1408926, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/resources/css/site.css hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/site.xml - copied unchanged from r1408926, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/site.xml Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1407704-1408926 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Tue Nov 13 20:21:39 2012 @@ -650,6 +650,16 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby) + + MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby) + + MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby) + + MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe + via bobby) + + MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by + default (Ravi Prakash via bobby) Release 0.23.4 - UNRELEASED Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1407704-1408926 Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1407704-1408926 Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java Tue Nov 13 20:21:39 2012 @@ -53,7 +53,7 @@ public class JobEndNotifier implements C protected String userUrl; protected String proxyConf; protected int numTries; //Number of tries to attempt notification - protected int waitInterval; //Time to wait between retrying notification + protected int waitInterval; //Time (ms) to wait between retrying notification protected URL urlToNotify; //URL to notify read from the config protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification @@ -71,10 +71,10 @@ public class JobEndNotifier implements C , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, 1) ); waitInterval = Math.min( - conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5) - , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5) + conf.getInt(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, 5000) + , conf.getInt(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, 5000) ); - waitInterval = (waitInterval < 0) ? 5 : waitInterval; + waitInterval = (waitInterval < 0) ? 5000 : waitInterval; userUrl = conf.get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Nov 13 20:21:39 2012 @@ -712,7 +712,10 @@ public class JobImpl implements org.apac * The only entry point to change the Job. */ public void handle(JobEvent event) { - LOG.debug("Processing " + event.getJobId() + " of type " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getJobId() + " of type " + + event.getType()); + } try { writeLock.lock(); JobStateInternal oldState = getInternalState(); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Nov 13 20:21:39 2012 @@ -22,9 +22,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -118,9 +120,18 @@ public abstract class TaskImpl implement protected Credentials credentials; protected Token<JobTokenIdentifier> jobToken; + //should be set to one which comes first + //saying COMMIT_PENDING + private TaskAttemptId commitAttempt; + + private TaskAttemptId successfulAttempt; + + private final Set<TaskAttemptId> failedAttempts; + // Track the finished attempts - successful, failed and killed + private final Set<TaskAttemptId> finishedAttempts; // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container - private int numberUncompletedAttempts = 0; + private final Set<TaskAttemptId> inProgressAttempts; private boolean historyTaskStartGenerated = false; @@ -182,6 +193,14 @@ public abstract class TaskImpl implement EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), TaskEventType.T_ATTEMPT_KILLED, new KillWaitAttemptKilledTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_SUCCEEDED, + new KillWaitAttemptSucceededTransition()) + .addTransition(TaskStateInternal.KILL_WAIT, + EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED), + TaskEventType.T_ATTEMPT_FAILED, + new KillWaitAttemptFailedTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.KILL_WAIT, @@ -189,8 +208,6 @@ public abstract class TaskImpl implement EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_LAUNCHED, TaskEventType.T_ATTEMPT_COMMIT_PENDING, - TaskEventType.T_ATTEMPT_FAILED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ADD_SPEC_ATTEMPT)) // Transitions from SUCCEEDED state @@ -200,13 +217,15 @@ public abstract class TaskImpl implement .addTransition(TaskStateInternal.SUCCEEDED, EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED), TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition()) + .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + new AttemptSucceededAtSucceededTransition()) // Ignore-able transitions. .addTransition( TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED, EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT, TaskEventType.T_ATTEMPT_COMMIT_PENDING, TaskEventType.T_ATTEMPT_LAUNCHED, - TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_KILL)) // Transitions from FAILED state @@ -242,15 +261,6 @@ public abstract class TaskImpl implement private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = new RecoverdAttemptsComparator(); - //should be set to one which comes first - //saying COMMIT_PENDING - private TaskAttemptId commitAttempt; - - private TaskAttemptId successfulAttempt; - - private int failedAttempts; - private int finishedAttempts;//finish are total of success, failed and killed - @Override public TaskState getState() { readLock.lock(); @@ -275,6 +285,9 @@ public abstract class TaskImpl implement readLock = readWriteLock.readLock(); writeLock = readWriteLock.writeLock(); this.attempts = Collections.emptyMap(); + this.finishedAttempts = new HashSet<TaskAttemptId>(2); + this.failedAttempts = new HashSet<TaskAttemptId>(2); + this.inProgressAttempts = new HashSet<TaskAttemptId>(2); // This overridable method call is okay in a constructor because we // have a convention that none of the overrides depends on any // fields that need initialization. @@ -611,9 +624,9 @@ public abstract class TaskImpl implement taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); } - ++numberUncompletedAttempts; + inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber - if (failedAttempts > 0) { + if (failedAttempts.size() > 0) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { @@ -788,12 +801,14 @@ public abstract class TaskImpl implement implements SingleArcTransition<TaskImpl, TaskEvent> { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event; + TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.SUCCEEDED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; - task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); + task.successfulAttempt = taskAttemptId; task.eventHandler.handle(new JobTaskEvent( task.taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + task.successfulAttempt); @@ -824,11 +839,13 @@ public abstract class TaskImpl implement SingleArcTransition<TaskImpl, TaskEvent> { @Override public void transition(TaskImpl task, TaskEvent event) { + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; - --task.numberUncompletedAttempts; + task.finishedAttempts.add(taskAttemptId); + task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { task.addAndScheduleAttempt(); } @@ -840,15 +857,25 @@ public abstract class TaskImpl implement MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { protected TaskStateInternal finalState = TaskStateInternal.KILLED; + protected final TaskAttemptCompletionEventStatus taCompletionEventStatus; + + public KillWaitAttemptKilledTransition() { + this(TaskAttemptCompletionEventStatus.KILLED); + } + + public KillWaitAttemptKilledTransition( + TaskAttemptCompletionEventStatus taCompletionEventStatus) { + this.taCompletionEventStatus = taCompletionEventStatus; + } @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), - TaskAttemptCompletionEventStatus.KILLED); - task.finishedAttempts++; + TaskAttemptId taskAttemptId = + ((TaskTAttemptEvent) event).getTaskAttemptID(); + task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus); + task.finishedAttempts.add(taskAttemptId); // check whether all attempts are finished - if (task.finishedAttempts == task.attempts.size()) { + if (task.finishedAttempts.size() == task.attempts.size()) { if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, finalState, null); // TODO JH verify failedAttempt null @@ -867,43 +894,57 @@ public abstract class TaskImpl implement } } + private static class KillWaitAttemptSucceededTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptSucceededTransition() { + super(TaskAttemptCompletionEventStatus.SUCCEEDED); + } + } + + private static class KillWaitAttemptFailedTransition extends + KillWaitAttemptKilledTransition { + public KillWaitAttemptFailedTransition() { + super(TaskAttemptCompletionEventStatus.FAILED); + } + } + private static class AttemptFailedTransition implements MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - task.failedAttempts++; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) { + TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID(); + task.failedAttempts.add(taskAttemptId); + if (taskAttemptId.equals(task.commitAttempt)) { task.commitAttempt = null; } - TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID()); + TaskAttempt attempt = task.attempts.get(taskAttemptId); if (attempt.getAssignedContainerMgrAddress() != null) { //container was assigned task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), attempt.getAssignedContainerMgrAddress())); } - task.finishedAttempts++; - if (task.failedAttempts < task.maxAttempts) { + task.finishedAttempts.add(taskAttemptId); + if (task.failedAttempts.size() < task.maxAttempts) { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.FAILED); // we don't need a new event if we already have a spare - if (--task.numberUncompletedAttempts == 0 + task.inProgressAttempts.remove(taskAttemptId); + if (task.inProgressAttempts.size() == 0 && task.successfulAttempt == null) { task.addAndScheduleAttempt(); } } else { task.handleTaskAttemptCompletion( - ((TaskTAttemptEvent) event).getTaskAttemptID(), + taskAttemptId, TaskAttemptCompletionEventStatus.TIPFAILED); - TaskTAttemptEvent ev = (TaskTAttemptEvent) event; - TaskAttemptId taId = ev.getTaskAttemptID(); if (task.historyTaskStartGenerated) { TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(), - TaskStateInternal.FAILED, taId); + TaskStateInternal.FAILED, taskAttemptId); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), taskFailedEvent)); } else { @@ -927,14 +968,14 @@ public abstract class TaskImpl implement @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - if (event instanceof TaskTAttemptEvent) { - TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; - if (task.getInternalState() == TaskStateInternal.SUCCEEDED && - !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { - // don't allow a different task attempt to override a previous - // succeeded state - return TaskStateInternal.SUCCEEDED; - } + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getInternalState() == TaskStateInternal.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); + return TaskStateInternal.SUCCEEDED; } // a successful REDUCE task should not be overridden @@ -953,7 +994,7 @@ public abstract class TaskImpl implement // believe that there's no redundancy. unSucceed(task); // fake increase in Uncomplete attempts for super.transition - ++task.numberUncompletedAttempts; + task.inProgressAttempts.add(castEvent.getTaskAttemptID()); return super.transition(task, event); } @@ -976,6 +1017,8 @@ public abstract class TaskImpl implement !attemptId.equals(task.successfulAttempt)) { // don't allow a different task attempt to override a previous // succeeded state + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); return TaskStateInternal.SUCCEEDED; } } @@ -1006,6 +1049,16 @@ public abstract class TaskImpl implement } } + private static class AttemptSucceededAtSucceededTransition + implements SingleArcTransition<TaskImpl, TaskEvent> { + @Override + public void transition(TaskImpl task, TaskEvent event) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + task.finishedAttempts.add(castEvent.getTaskAttemptID()); + task.inProgressAttempts.remove(castEvent.getTaskAttemptID()); + } + } + private static class KillNewTransition implements SingleArcTransition<TaskImpl, TaskEvent> { @Override @@ -1045,7 +1098,7 @@ public abstract class TaskImpl implement (attempt, "Task KILL is received. Killing attempt!"); } - task.numberUncompletedAttempts = 0; + task.inProgressAttempts.clear(); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Nov 13 20:21:39 2012 @@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent; @@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; +import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; @@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster { return job; } + public void waitForInternalState(JobImpl job, + JobStateInternal finalState) throws Exception { + int timeoutSecs = 0; + JobStateInternal iState = job.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Job Internal State is : " + iState + + " Waiting for Internal state : " + finalState); + Thread.sleep(500); + iState = job.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + + public void waitForInternalState(TaskImpl task, + TaskStateInternal finalState) throws Exception { + int timeoutSecs = 0; + TaskReport report = task.getReport(); + TaskStateInternal iState = task.getInternalState(); + while (!finalState.equals(iState) && timeoutSecs++ < 20) { + System.out.println("Task Internal State is : " + iState + + " Waiting for Internal state : " + finalState + " progress : " + + report.getProgress()); + Thread.sleep(500); + report = task.getReport(); + iState = task.getInternalState(); + } + System.out.println("Task Internal State is : " + iState); + Assert.assertEquals("Task Internal state is not correct (timedout)", + finalState, iState); + } + public void waitForInternalState(TaskAttemptImpl attempt, TaskAttemptStateInternal finalState) throws Exception { int timeoutSecs = 0; Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Tue Nov 13 20:21:39 2012 @@ -55,22 +55,22 @@ public class TestJobEndNotifier extends //Test maximum retry interval is capped by //MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL private void testWaitInterval(Configuration conf) { - conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5"); - conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1"); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5000"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1000"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval, - waitInterval == 1); + Assert.assertTrue("Expected waitInterval to be 1000, but was " + + waitInterval, waitInterval == 1000); - conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10"); + conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10000"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, - waitInterval == 5); + Assert.assertTrue("Expected waitInterval to be 5000, but was " + + waitInterval, waitInterval == 5000); //Test negative numbers are set to default conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10"); setConf(conf); - Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval, - waitInterval == 5); + Assert.assertTrue("Expected waitInterval to be 5000, but was " + + waitInterval, waitInterval == 5000); } private void testProxyConfiguration(Configuration conf) { @@ -125,17 +125,28 @@ public class TestJobEndNotifier extends public void testNotifyRetries() throws InterruptedException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); + JobReport jobReport = Mockito.mock(JobReport.class); + + long startTime = System.currentTimeMillis(); + this.notificationCount = 0; + this.setConf(conf); + this.notify(jobReport); + long endTime = System.currentTimeMillis(); + Assert.assertEquals("Only 1 try was expected but was : " + + this.notificationCount, this.notificationCount, 1); + Assert.assertTrue("Should have taken more than 5 seconds it took " + + (endTime - startTime), endTime - startTime > 5000); + conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3"); conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3"); conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000"); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000"); - JobReport jobReport = Mockito.mock(JobReport.class); - long startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); this.notificationCount = 0; this.setConf(conf); this.notify(jobReport); - long endTime = System.currentTimeMillis(); + endTime = System.currentTimeMillis(); Assert.assertEquals("Only 3 retries were expected but was : " + this.notificationCount, this.notificationCount, 3); Assert.assertTrue("Should have taken more than 9 seconds it took " Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Tue Nov 13 20:21:39 2012 @@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLat import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; @@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.junit.Test; /** * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios. * */ +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestKill { @Test @@ -132,6 +141,80 @@ public class TestKill { } @Test + public void testKillTaskWait() throws Exception { + final Dispatcher dispatcher = new AsyncDispatcher() { + private TaskAttemptEvent cachedKillEvent; + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent killEvent = (TaskAttemptEvent) event; + if (killEvent.getType() == TaskAttemptEventType.TA_KILL) { + TaskAttemptId taID = killEvent.getTaskAttemptID(); + if (taID.getTaskId().getTaskType() == TaskType.REDUCE + && taID.getTaskId().getId() == 0 && taID.getId() == 0) { + // Task is asking the reduce TA to kill itself. 'Create' a race + // condition. Make the task succeed and then inform the task that + // TA has succeeded. Once Task gets the TA succeeded event at + // KILL_WAIT, then relay the actual kill signal to TA + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_DONE)); + super.dispatch(new TaskAttemptEvent(taID, + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + super.dispatch(new TaskTAttemptEvent(taID, + TaskEventType.T_ATTEMPT_SUCCEEDED)); + this.cachedKillEvent = killEvent; + return; + } + } + } else if (event instanceof TaskEvent) { + TaskEvent taskEvent = (TaskEvent) event; + if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED + && this.cachedKillEvent != null) { + // When the TA comes and reports that it is done, send the + // cachedKillEvent + super.dispatch(this.cachedKillEvent); + return; + } + + } + super.dispatch(event); + } + }; + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.waitForState(mapTask, TaskState.SUCCEEDED); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1); MRApp app = new BlockingMRApp(2, 0, latch); Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Nov 13 20:21:39 2012 @@ -141,7 +141,6 @@ public class TestTaskImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; - private TaskAttemptId attemptId; private TaskType taskType; public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, @@ -152,14 +151,11 @@ public class TestTaskImpl { AppContext appContext, TaskType taskType) { super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf, dataLocations, committer, jobToken, credentials, clock, appContext); - attemptId = Records.newRecord(TaskAttemptId.class); - attemptId.setId(id); - attemptId.setTaskId(taskId); this.taskType = taskType; } public TaskAttemptId getAttemptId() { - return attemptId; + return getID(); } @Override @@ -561,4 +557,49 @@ public class TestTaskImpl { mockTask = createMockTask(TaskType.REDUCE); runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING); } + + @Test + public void testSpeculativeMapFetchFailure() { + // Setup a scenario where speculative task wins, first attempt killed + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapMultipleSucceedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } + + @Test + public void testSpeculativeMapFailedFetchFailure() { + // Setup a scenario where speculative task wins, first attempt succeeds + mockTask = createMockTask(TaskType.MAP); + runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED); + assertEquals(2, taskAttempts.size()); + + // speculative attempt retroactively fails from fetch failures + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + assertTaskScheduledState(); + assertEquals(3, taskAttempts.size()); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Tue Nov 13 20:21:39 2012 @@ -51,7 +51,7 @@ <artifactId>maven-surefire-plugin</artifactId> <configuration> <systemPropertyVariables> - <log4j.configuration>file:///${project.parent.basedir}/../src/test/log4j.properties</log4j.configuration> + <log4j.configuration>file:///${project.basedir}/src/test/resources/log4j.properties</log4j.configuration> </systemPropertyVariables> </configuration> </plugin> Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1408938&r1=1408937&r2=1408938&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Tue Nov 13 20:21:39 2012 @@ -610,36 +610,6 @@ </description> </property> -<!-- Job Notification Configuration --> - -<!-- -<property> - <name>mapreduce.job.end-notification.url</name> - <value>http://localhost:8080/jobstatus.php?jobId=$jobId&jobStatus=$jobStatus</value> - <description>Indicates url which will be called on completion of job to inform - end status of job. - User can give at most 2 variables with URI : $jobId and $jobStatus. - If they are present in URI, then they will be replaced by their - respective values. -</description> -</property> ---> - -<property> - <name>mapreduce.job.end-notification.retry.attempts</name> - <value>0</value> - <description>Indicates how many times hadoop should attempt to contact the - notification URL </description> -</property> - -<property> - <name>mapreduce.job.end-notification.retry.interval</name> - <value>30000</value> - <description>Indicates time in milliseconds between notification URL retry - calls</description> -</property> - - <property> <name>mapreduce.job.queuename</name> <value>default</value> @@ -802,49 +772,53 @@ </description> </property> +<!-- Job Notification Configuration --> <property> - <name>mapreduce.job.end-notification.max.attempts</name> - <value>5</value> - <final>true</final> - <description>The maximum number of times a URL will be read for providing job - end notification. Cluster administrators can set this to limit how long - after end of a job, the Application Master waits before exiting. Must be - marked as final to prevent users from overriding this. - </description> + <name>mapreduce.job.end-notification.url</name> + <!--<value>http://localhost:8080/jobstatus.php?jobId=$jobId&jobStatus=$jobStatus</value>--> + <description>Indicates url which will be called on completion of job to inform + end status of job. + User can give at most 2 variables with URI : $jobId and $jobStatus. + If they are present in URI, then they will be replaced by their + respective values. +</description> </property> <property> - <name>mapreduce.job.end-notification.max.retry.interval</name> - <value>5</value> - <final>true</final> - <description>The maximum amount of time (in seconds) to wait before retrying - job end notification. Cluster administrators can set this to limit how long - the Application Master waits before exiting. Must be marked as final to - prevent users from overriding this.</description> + <name>mapreduce.job.end-notification.retry.attempts</name> + <value>0</value> + <description>The number of times the submitter of the job wants to retry job + end notification if it fails. This is capped by + mapreduce.job.end-notification.max.attempts</description> </property> <property> - <name>mapreduce.job.end-notification.url</name> - <value></value> - <description>The URL to send job end notification. It may contain sentinels - $jobId and $jobStatus which will be replaced with jobId and jobStatus. - </description> + <name>mapreduce.job.end-notification.retry.interval</name> + <value>1000</value> + <description>The number of milliseconds the submitter of the job wants to + wait before job end notification is retried if it fails. This is capped by + mapreduce.job.end-notification.max.retry.interval</description> </property> <property> - <name>mapreduce.job.end-notification.retry.attempts</name> + <name>mapreduce.job.end-notification.max.attempts</name> <value>5</value> - <description>The number of times the submitter of the job wants to retry job - end notification if it fails. This is capped by - mapreduce.job.end-notification.max.attempts</description> + <final>true</final> + <description>The maximum number of times a URL will be read for providing job + end notification. Cluster administrators can set this to limit how long + after end of a job, the Application Master waits before exiting. Must be + marked as final to prevent users from overriding this. + </description> </property> <property> - <name>mapreduce.job.end-notification.retry.interval</name> - <value>1</value> - <description>The number of seconds the submitter of the job wants to wait - before job end notification is retried if it fails. This is capped by - mapreduce.job.end-notification.max.retry.interval</description> + <name>mapreduce.job.end-notification.max.retry.interval</name> + <value>5000</value> + <final>true</final> + <description>The maximum amount of time (in milliseconds) to wait before + retrying job end notification. Cluster administrators can set this to + limit how long the Application Master waits before exiting. Must be marked + as final to prevent users from overriding this.</description> </property> <property> Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1407704-1408926