MAPREDUCE-5817. Mappers get rescheduled on node transition even after all 
reducers are completed. (Sangjin Lee via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27d24f96
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27d24f96
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27d24f96

Branch: refs/heads/HDFS-7285
Commit: 27d24f96ab8d17e839a1ef0d7076efc78d28724a
Parents: 84bf712
Author: Karthik Kambatla <ka...@apache.org>
Authored: Fri Aug 14 12:29:50 2015 -0700
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Fri Aug 14 12:29:50 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  38 ++++--
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 130 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d24f96/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index c424667..19bd697 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -552,6 +552,9 @@ Release 2.8.0 - UNRELEASED
 
     MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
 
+    MAPREDUCE-5817. Mappers get rescheduled on node transition even after all
+    reducers are completed. (Sangjin Lee via kasha)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d24f96/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 9d141eb..fc9a3a5 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -129,6 +129,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /** Implementation of Job interface. Maintains the state machines of Job.
@@ -1330,15 +1331,21 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   
   private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
     // rerun previously successful map tasks
-    List<TaskAttemptId> taskAttemptIdList = 
nodesToSucceededTaskAttempts.get(nodeId);
-    if(taskAttemptIdList != null) {
-      String mesg = "TaskAttempt killed because it ran on unusable node "
-          + nodeId;
-      for(TaskAttemptId id : taskAttemptIdList) {
-        if(TaskType.MAP == id.getTaskId().getTaskType()) {
-          // reschedule only map tasks because their outputs maybe unusable
-          LOG.info(mesg + ". AttemptId:" + id);
-          eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+    // do this only if the job is still in the running state and there are
+    // running reducers
+    if (getInternalState() == JobStateInternal.RUNNING &&
+        !allReducersComplete()) {
+      List<TaskAttemptId> taskAttemptIdList =
+          nodesToSucceededTaskAttempts.get(nodeId);
+      if (taskAttemptIdList != null) {
+        String mesg = "TaskAttempt killed because it ran on unusable node "
+            + nodeId;
+        for (TaskAttemptId id : taskAttemptIdList) {
+          if (TaskType.MAP == id.getTaskId().getTaskType()) {
+            // reschedule only map tasks because their outputs maybe unusable
+            LOG.info(mesg + ". AttemptId:" + id);
+            eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+          }
         }
       }
     }
@@ -1346,6 +1353,10 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
     // RMContainerAllocator
   }
 
+  private boolean allReducersComplete() {
+    return numReduceTasks == 0 || numReduceTasks == getCompletedReduces();
+  }
+
   /*
   private int getBlockSize() {
     String inputClassName = conf.get(MRJobConfig.INPUT_FORMAT_CLASS_ATTR);
@@ -2084,13 +2095,18 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
     }
   }
 
+  @VisibleForTesting
+  void decrementSucceededMapperCount() {
+    completedTaskCount--;
+    succeededMapTaskCount--;
+  }
+
   private static class MapTaskRescheduledTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
       //succeeded map task is restarted back
-      job.completedTaskCount--;
-      job.succeededMapTaskCount--;
+      job.decrementSucceededMapperCount();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d24f96/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index cae9663..2af4380 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -19,15 +19,21 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
@@ -35,10 +41,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.jobhistory.EventType;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -46,10 +48,17 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import 
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -64,7 +73,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
+import 
org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
@@ -76,6 +89,9 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
@@ -508,6 +524,112 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  @Test(timeout=20000)
+  public void testUnusableNodeTransition() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
+    // add a special task event handler to put the task back to running in case
+    // of task rescheduling/killing
+    EventHandler<TaskAttemptEvent> taskAttemptEventHandler =
+        new EventHandler<TaskAttemptEvent>() {
+      @Override
+      public void handle(TaskAttemptEvent event) {
+        if (event.getType() == TaskAttemptEventType.TA_KILL) {
+          job.decrementSucceededMapperCount();
+        }
+      }
+    };
+    dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
+
+    // replace the tasks with spied versions to return the right attempts
+    Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
+    List<NodeReport> nodeReports = new ArrayList<NodeReport>();
+    Map<NodeReport,TaskId> nodeReportsToTaskIds =
+        new HashMap<NodeReport,TaskId>();
+    for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
+      TaskId taskId = e.getKey();
+      Task task = e.getValue();
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // add an attempt to the task to simulate nodes
+        NodeId nodeId = mock(NodeId.class);
+        TaskAttempt attempt = mock(TaskAttempt.class);
+        when(attempt.getNodeId()).thenReturn(nodeId);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        when(attempt.getID()).thenReturn(attemptId);
+        // create a spied task
+        Task spied = spy(task);
+        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
+        spiedTasks.put(taskId, spied);
+
+        // create a NodeReport based on the node id
+        NodeReport report = mock(NodeReport.class);
+        when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
+        when(report.getNodeId()).thenReturn(nodeId);
+        nodeReports.add(report);
+        nodeReportsToTaskIds.put(report, taskId);
+      }
+    }
+    // replace the tasks with the spied tasks
+    job.tasks.putAll(spiedTasks);
+
+    // complete all mappers first
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // generate a task attempt completed event first to populate the
+        // nodes-to-succeeded-attempts map
+        TaskAttemptCompletionEvent tce =
+            Records.newRecord(TaskAttemptCompletionEvent.class);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        tce.setAttemptId(attemptId);
+        tce.setStatus(TaskAttemptCompletionEventStatus.SUCCEEDED);
+        job.handle(new JobTaskAttemptCompletedEvent(tce));
+        // complete the task itself
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+        Assert.assertEquals(JobState.RUNNING, job.getState());
+      }
+    }
+
+    // add an event for a node transition
+    NodeReport firstMapperNodeReport = nodeReports.get(0);
+    NodeReport secondMapperNodeReport = nodeReports.get(1);
+    job.handle(new JobUpdatedNodesEvent(job.getID(),
+        Collections.singletonList(firstMapperNodeReport)));
+    // complete the reducer
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.REDUCE) {
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+      }
+    }
+    // add another event for a node transition for the other mapper
+    // this should not trigger rescheduling
+    job.handle(new JobUpdatedNodesEvent(job.getID(),
+        Collections.singletonList(secondMapperNodeReport)));
+    // complete the first mapper that was rescheduled
+    TaskId firstMapper = nodeReportsToTaskIds.get(firstMapperNodeReport);
+    job.handle(new JobTaskEvent(firstMapper, TaskState.SUCCEEDED));
+    // verify the state is moving to committing
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer complete and verify the job succeeds
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
     t.testJobNoTasks();

Reply via email to