Author: tgraves
Date: Wed Feb  6 22:22:22 2013
New Revision: 1443243

URL: http://svn.apache.org/viewvc?rev=1443243&view=rev
Log:
MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and 
UNASSIGNED states. (Mayank Bansal via tgraves)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed 
Feb  6 22:22:22 2013
@@ -47,6 +47,9 @@ Release 0.23.7 - UNRELEASED
 
     MAPREDUCE-4953. HadoopPipes misuses fprintf. (Andy Isaacson via tgraves)
 
+    MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and 
+    UNASSIGNED states. (Mayank Bansal via tgraves)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
 Wed Feb  6 22:22:22 2013
@@ -196,6 +196,10 @@ public abstract class TaskAttemptImpl im
          TaskAttemptEventType.TA_KILL, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.NEW, 
TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+     .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.NEW,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -207,6 +211,10 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, 
TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
              TaskAttemptStateInternal.FAILED, true))
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+          TaskAttemptStateInternal.UNASSIGNED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the ASSIGNED state.
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
TaskAttemptStateInternal.RUNNING,

Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1443243&r1=1443242&r2=1443243&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
 Wed Feb  6 22:22:22 2013
@@ -630,6 +630,106 @@ public class TestTaskAttempt{
         eventHandler.internalError);
   }
 
+  @Test
+  public void testAppDiognosticEventOnUnassignedTask() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl =
+        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+            splits, jobConf, taListener, mock(Token.class),
+            new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+        "Task got killed"));
+    assertFalse(
+        "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on 
assigned task",
+        eventHandler.internalError);
+  }
+
+  @Test
+  public void testAppDiognosticEventOnNewTask() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, 
+        mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+        "Task got killed"));
+    assertFalse(
+        "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on 
assigned task",
+        eventHandler.internalError);
+  }
+    
+  
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
     


Reply via email to