Author: jlowe Date: Fri Jan 17 21:38:37 2014 New Revision: 1559256 URL: http://svn.apache.org/r1559256 Log: MAPREDUCE-5717. Task pings are interpreted as task progress. Contributed by Jason Lowe
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1559256&r1=1559255&r2=1559256&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Jan 17 21:38:37 2014 @@ -146,6 +146,8 @@ Trunk (Unreleased) MAPREDUCE-5191. TestQueue#testQueue fails with timeout on Windows. (Ivan Mitic via hitesh) + MAPREDUCE-5717. Task pings are interpreted as task progress (jlowe) + Release 2.4.0 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014 @@ -361,7 +361,6 @@ public class TaskAttemptListenerImpl ext if (taskStatus == null) { //We are using statusUpdate only as a simple ping LOG.info("Ping from " + taskAttemptID.toString()); - taskHeartbeatHandler.progressing(yarnAttemptID); return feedback; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1559256&r1=1559255&r2=1559256&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Jan 17 21:38:37 2014 @@ -381,4 +381,50 @@ public class TestTaskAttemptListenerImpl } + @SuppressWarnings("rawtypes") + @Test + public void testStatusUpdateProgress() + throws IOException, InterruptedException { + AppContext appCtx = mock(AppContext.class); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + MockTaskAttemptListenerImpl listener = + new MockTaskAttemptListenerImpl(appCtx, secret, + rmHeartbeatHandler, hbHandler, policy); + Configuration conf = new Configuration(); + listener.init(conf); + listener.start(); + JVMId id = new JVMId("foo",1, true, 1); + WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + + TaskAttemptID attemptID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1); + TaskAttemptId attemptId = TypeConverter.toYarn(attemptID); + Task task = mock(Task.class); + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptId, wid); + verify(hbHandler).register(attemptId); + + // make sure a ping doesn't report progress + AMFeedback feedback = listener.statusUpdate(attemptID, null); + assertTrue(feedback.getTaskFound()); + verify(hbHandler, never()).progressing(eq(attemptId)); + + // make sure a status update does report progress + MapTaskStatus mockStatus = new MapTaskStatus(attemptID, 0.0f, 1, + TaskStatus.State.RUNNING, "", "RUNNING", "", TaskStatus.Phase.MAP, + new Counters()); + feedback = listener.statusUpdate(attemptID, mockStatus); + assertTrue(feedback.getTaskFound()); + verify(hbHandler).progressing(eq(attemptId)); + listener.close(); + } }