MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/YARN-5881
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Thu Aug 10 15:17:36 2017 -0700

----------------------------------------------------------------------
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  35 ++++-
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++++++++++++++----
 .../apache/hadoop/mapreduce/MRJobConfig.java    |   6 +-
 .../src/main/resources/mapred-default.xml       |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token<JobTokenIdentifier> jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
     this.maxFetchFailuresNotifications = conf.getInt(
         MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
         MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+    this.finishJobWhenReducersDone = conf.getBoolean(
+        MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+        MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine<JobStateInternal, JobEventType, JobEvent> 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
                 TimeUnit.MILLISECONDS);
         return JobStateInternal.FAIL_WAIT;
       }
-      
+
+      checkReadyForCompletionWhenAllReducersDone(job);
+
       return job.checkReadyForCommit();
     }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
       }
       job.metrics.killedTask(task);
     }
+
+   /** Improvement: if all reducers have finished, we check if we have
+       restarted mappers that are still running. This can happen in a
+       situation when a node becomes UNHEALTHY and mappers are rescheduled.
+       See MAPREDUCE-6870 for details */
+    private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+      if (job.finishJobWhenReducersDone) {
+        int totalReduces = job.getTotalReduces();
+        int completedReduces = job.getCompletedReduces();
+
+        if (totalReduces > 0 && totalReduces == completedReduces
+            && !job.completingJob) {
+
+          for (TaskId mapTaskId : job.mapTasks) {
+            MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+            if (!task.isFinished()) {
+              LOG.info("Killing map task " + task.getID());
+              job.eventHandler.handle(
+                  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+            }
+          }
+
+          job.completingJob = true;
+        }
+      }
+    }
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 2147ec1..1827ce4 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -564,33 +564,13 @@ public class TestJobImpl {
     dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler);
 
     // replace the tasks with spied versions to return the right attempts
-    Map<TaskId,Task> spiedTasks = new HashMap<TaskId,Task>();
-    List<NodeReport> nodeReports = new ArrayList<NodeReport>();
-    Map<NodeReport,TaskId> nodeReportsToTaskIds =
-        new HashMap<NodeReport,TaskId>();
-    for (Map.Entry<TaskId,Task> e: job.tasks.entrySet()) {
-      TaskId taskId = e.getKey();
-      Task task = e.getValue();
-      if (taskId.getTaskType() == TaskType.MAP) {
-        // add an attempt to the task to simulate nodes
-        NodeId nodeId = mock(NodeId.class);
-        TaskAttempt attempt = mock(TaskAttempt.class);
-        when(attempt.getNodeId()).thenReturn(nodeId);
-        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-        when(attempt.getID()).thenReturn(attemptId);
-        // create a spied task
-        Task spied = spy(task);
-        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
-        spiedTasks.put(taskId, spied);
+    Map<TaskId, Task> spiedTasks = new HashMap<>();
+    List<NodeReport> nodeReports = new ArrayList<>();
+    Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+    createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+        NodeState.UNHEALTHY, nodeReports);
 
-        // create a NodeReport based on the node id
-        NodeReport report = mock(NodeReport.class);
-        when(report.getNodeState()).thenReturn(NodeState.UNHEALTHY);
-        when(report.getNodeId()).thenReturn(nodeId);
-        nodeReports.add(report);
-        nodeReportsToTaskIds.put(report, taskId);
-      }
-    }
     // replace the tasks with the spied tasks
     job.tasks.putAll(spiedTasks);
 
@@ -641,6 +621,82 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  @Test
+  public void testJobNCompletedWhenAllReducersAreFinished()
+      throws Exception {
+    testJobCompletionWhenReducersAreFinished(true);
+  }
+
+  @Test
+  public void testJobNotCompletedWhenAllReducersAreFinished()
+      throws Exception {
+    testJobCompletionWhenReducersAreFinished(false);
+  }
+
+  private void testJobCompletionWhenReducersAreFinished(boolean killMappers)
+      throws InterruptedException, BrokenBarrierException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, killMappers);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    final List<TaskEvent> killedEvents =
+        Collections.synchronizedList(new ArrayList<TaskEvent>());
+    dispatcher.register(TaskEventType.class, new EventHandler<TaskEvent>() {
+      @Override
+      public void handle(TaskEvent event) {
+        if (event.getType() == TaskEventType.T_KILL) {
+          killedEvents.add(event);
+        }
+      }
+    });
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    final JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
+
+    // replace the tasks with spied versions to return the right attempts
+    Map<TaskId, Task> spiedTasks = new HashMap<>();
+    List<NodeReport> nodeReports = new ArrayList<>();
+    Map<NodeReport, TaskId> nodeReportsToTaskIds = new HashMap<>();
+
+    createSpiedMapTasks(nodeReportsToTaskIds, spiedTasks, job,
+        NodeState.RUNNING, nodeReports);
+
+    // replace the tasks with the spied tasks
+    job.tasks.putAll(spiedTasks);
+
+    // finish reducer
+    for (TaskId taskId: job.tasks.keySet()) {
+      if (taskId.getTaskType() == TaskType.REDUCE) {
+        job.handle(new JobTaskEvent(taskId, TaskState.SUCCEEDED));
+      }
+    }
+
+    dispatcher.await();
+
+    /*
+     * StubbedJob cannot finish in this test - we'd have to generate the
+     * necessary events in this test manually, but that wouldn't add too
+     * much value. Instead, we validate the T_KILL events.
+     */
+    if (killMappers) {
+      Assert.assertEquals("Number of killed events", 2, killedEvents.size());
+      Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000000",
+          killedEvents.get(0).getTaskID().toString());
+      Assert.assertEquals("AttemptID", "task_1234567890000_0001_m_000001",
+          killedEvents.get(1).getTaskID().toString());
+    } else {
+      Assert.assertEquals("Number of killed events", 0, killedEvents.size());
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
     t.testJobNoTasks();
@@ -1021,6 +1077,37 @@ public class TestJobImpl {
     Assert.assertEquals(state, job.getInternalState());
   }
 
+  private void createSpiedMapTasks(Map<NodeReport, TaskId>
+      nodeReportsToTaskIds, Map<TaskId, Task> spiedTasks, JobImpl job,
+      NodeState nodeState, List<NodeReport> nodeReports) {
+    for (Map.Entry<TaskId, Task> e: job.tasks.entrySet()) {
+      TaskId taskId = e.getKey();
+      Task task = e.getValue();
+      if (taskId.getTaskType() == TaskType.MAP) {
+        // add an attempt to the task to simulate nodes
+        NodeId nodeId = mock(NodeId.class);
+        TaskAttempt attempt = mock(TaskAttempt.class);
+        when(attempt.getNodeId()).thenReturn(nodeId);
+        TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+        when(attempt.getID()).thenReturn(attemptId);
+        // create a spied task
+        Task spied = spy(task);
+        Map<TaskAttemptId, TaskAttempt> attemptMap = new HashMap<>();
+        attemptMap.put(attemptId, attempt);
+        when(spied.getAttempts()).thenReturn(attemptMap);
+        doReturn(attempt).when(spied).getAttempt(any(TaskAttemptId.class));
+        spiedTasks.put(taskId, spied);
+
+        // create a NodeReport based on the node id
+        NodeReport report = mock(NodeReport.class);
+        when(report.getNodeState()).thenReturn(nodeState);
+        when(report.getNodeId()).thenReturn(nodeId);
+        nodeReports.add(report);
+        nodeReportsToTaskIds.put(report, taskId);
+      }
+    }
+  }
+
   private static class JobSubmittedEventHandler implements
       EventHandler<JobHistoryEvent> {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index cfc1bcc..2023ba3 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -431,7 +431,7 @@ public interface MRJobConfig {
   public static final String JOB_ACL_MODIFY_JOB = 
"mapreduce.job.acl-modify-job";
 
   public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
-  
+
   public static final String JOB_RUNNING_MAP_LIMIT =
       "mapreduce.job.running.map.limit";
   public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
@@ -1033,4 +1033,8 @@ public interface MRJobConfig {
   String MR_JOB_REDACTED_PROPERTIES = "mapreduce.job.redacted-properties";
 
   String MR_JOB_SEND_TOKEN_CONF = "mapreduce.job.send-token-conf";
+
+  String FINISH_JOB_WHEN_REDUCERS_DONE =
+      "mapreduce.job.finish-when-all-reducers-done";
+  boolean DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE = true;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 101aa07..ee9b906 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1126,6 +1126,14 @@
 </property>
 
 <property>
+  <name>mapreduce.job.finish-when-all-reducers-done</name>
+  <value>true</value>
+  <description>Specifies whether the job should complete once all reducers
+     have finished, regardless of whether there are still running mappers.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.job.token.tracking.ids.enabled</name>
   <value>false</value>
   <description>Whether to write tracking ids of tokens to


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to