http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b8e6f43..15028f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -28,6 +28,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -35,6 +36,7 @@ import static org.mockito.Mockito.when; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -91,6 +93,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -151,6 +158,7 @@ public class TestRMAppAttemptTransitions { private NMTokenSecretManagerInRM nmTokenManager = spy(new NMTokenSecretManagerInRM(conf)); private boolean transferStateFromPreviousAttempt = false; + private EventHandler<RMNodeEvent> rmnodeEventHandler; private final class TestApplicationAttemptEventDispatcher implements EventHandler<RMAppAttemptEvent> { @@ -203,7 +211,7 @@ public class TestRMAppAttemptTransitions { applicationMasterLauncher.handle(event); } } - + private static int appId = 1; private ApplicationSubmissionContext submissionContext = null; @@ -268,6 +276,9 @@ public class TestRMAppAttemptTransitions { rmDispatcher.register(AMLauncherEventType.class, new TestAMLauncherEventDispatcher()); + rmnodeEventHandler = mock(RMNodeImpl.class); + rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler); + rmDispatcher.init(conf); rmDispatcher.start(); @@ -575,6 +586,8 @@ public class TestRMAppAttemptTransitions { } assertEquals(finishedContainerCount, applicationAttempt .getJustFinishedContainers().size()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); @@ -704,7 +717,8 @@ public class TestRMAppAttemptTransitions { application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); + applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class), + container.getNodeId())); // complete AM String diagnostics = "Successful"; FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; @@ -752,10 +766,11 @@ public class TestRMAppAttemptTransitions { when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L); when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L); sendAttemptUpdateSavedEvent(applicationAttempt); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( attemptId, ContainerStatus.newInstance( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null); @@ -857,8 +872,9 @@ public class TestRMAppAttemptTransitions { SchedulerUtils.LOST_CONTAINER); // send CONTAINER_FINISHED event at SCHEDULED state, // The state should be FINAL_SAVING with previous state SCHEDULED + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, anyNodeId)); // createApplicationAttemptState will return previous state (SCHEDULED), // if the current state is FINAL_SAVING. assertEquals(YarnApplicationAttemptState.SCHEDULED, @@ -904,8 +920,9 @@ public class TestRMAppAttemptTransitions { ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - applicationAttempt.getAppAttemptId(), cs)); + applicationAttempt.getAppAttemptId(), cs, anyNodeId)); assertEquals(YarnApplicationAttemptState.ALLOCATED, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -928,16 +945,17 @@ public class TestRMAppAttemptTransitions { ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(), ContainerState.COMPLETE, containerDiagMsg, exitCode); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs)); + appAttemptId, cs, anyNodeId)); // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Failed state. assertEquals(RMAppAttemptState.FINAL_SAVING, - applicationAttempt.getAppAttemptState()); + applicationAttempt.getAppAttemptState()); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -947,7 +965,7 @@ public class TestRMAppAttemptTransitions { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.FAILED, applicationAttempt.getAppAttemptState()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -971,10 +989,11 @@ public class TestRMAppAttemptTransitions { // ignored ContainerFinished and Expire at FinalSaving if we were supposed // to Killed state. assertEquals(RMAppAttemptState.FINAL_SAVING, - applicationAttempt.getAppAttemptState()); + applicationAttempt.getAppAttemptState()); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); applicationAttempt.handle(new RMAppAttemptEvent( applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE)); assertEquals(RMAppAttemptState.FINAL_SAVING, @@ -984,7 +1003,7 @@ public class TestRMAppAttemptTransitions { sendAttemptUpdateSavedEvent(applicationAttempt); assertEquals(RMAppAttemptState.KILLED, applicationAttempt.getAppAttemptState()); - assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); + assertEquals(1,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", @@ -1144,13 +1163,14 @@ public class TestRMAppAttemptTransitions { unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, diagnostics); // container must be AM container to move from FINISHING to FINISHED + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle( new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( BuilderUtils.newContainerId( applicationAttempt.getAppAttemptId(), 42), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl, diagnostics); } @@ -1165,13 +1185,14 @@ public class TestRMAppAttemptTransitions { String diagnostics = "Successful"; unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl, diagnostics); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle( new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(amContainer.getId(), - ContainerState.COMPLETE, "", 0))); + ContainerState.COMPLETE, "", 0), anyNodeId)); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0, false); + diagnostics, 1, false); } // While attempt is at FINAL_SAVING, Contaienr_Finished event may come before @@ -1195,15 +1216,16 @@ public class TestRMAppAttemptTransitions { assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); // Container_finished event comes before Attempt_Saved event. + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus( - amContainer.getId(), ContainerState.COMPLETE, "", 0))); + amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId)); assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); // send attempt_saved sendAttemptUpdateSavedEvent(applicationAttempt); testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl, - diagnostics, 0, false); + diagnostics, 1, false); } // While attempt is at FINAL_SAVING, Expire event may come before @@ -1235,6 +1257,71 @@ public class TestRMAppAttemptTransitions { diagnostics, 0, false); } + @Test + public void testFinishedContainer() { + Container amContainer = allocateApplicationAttempt(); + launchApplicationAttempt(amContainer); + runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false); + + // Complete one container + ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt + .getAppAttemptId(), 2); + Container container1 = mock(Container.class); + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + when(container1.getId()).thenReturn( + containerId1); + when(containerStatus1.getContainerId()).thenReturn(containerId1); + when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + + application.handle(new RMAppRunningOnNodeEvent(application + .getApplicationId(), + container1.getNodeId())); + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), containerStatus1, + container1.getNodeId())); + + ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor = + ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class); + + // Verify justFinishedContainers + Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers() + .size()); + Assert.assertEquals(container1.getId(), applicationAttempt + .getJustFinishedContainers().get(0).getContainerId()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); + + // Verify finishedContainersSentToAM gets container after pull + List<ContainerStatus> containerStatuses = applicationAttempt + .pullJustFinishedContainers(); + Assert.assertEquals(1, containerStatuses.size()); + Mockito.verify(rmnodeEventHandler, never()).handle(Mockito + .any(RMNodeEvent.class)); + Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt) + .size()); + + // Verify container is acked to NM via the RMNodeEvent after second pull + containerStatuses = applicationAttempt.pullJustFinishedContainers(); + Assert.assertEquals(0, containerStatuses.size()); + Mockito.verify(rmnodeEventHandler).handle(captor.capture()); + Assert.assertEquals(container1.getId(), captor.getValue().getContainers() + .get(0)); + Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty()); + Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt) + .size()); + } + + private static List<ContainerStatus> getFinishedContainersSentToAM( + RMAppAttempt applicationAttempt) { + List<ContainerStatus> containers = new ArrayList<ContainerStatus>(); + for (List<ContainerStatus> containerStatuses: applicationAttempt + .getFinishedContainersSentToAMReference().values()) { + containers.addAll(containerStatuses); + } + return containers; + } + // this is to test user can get client tokens only after the client token // master key is saved in the state store and also registered in // ClientTokenSecretManager @@ -1281,8 +1368,9 @@ public class TestRMAppAttemptTransitions { ContainerStatus.newInstance(amContainer.getId(), ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, anyNodeId)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt); @@ -1293,15 +1381,21 @@ public class TestRMAppAttemptTransitions { verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); // failed attempt captured the container finished event. - assertEquals(0, applicationAttempt.getJustFinishedContainers().size()); + assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); ContainerStatus cs2 = ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2), ContainerState.COMPLETE, "", 0); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs2)); - assertEquals(1, applicationAttempt.getJustFinishedContainers().size()); - assertEquals(cs2.getContainerId(), applicationAttempt - .getJustFinishedContainers().get(0).getContainerId()); + appAttemptId, cs2, anyNodeId)); + assertEquals(2, applicationAttempt.getJustFinishedContainers().size()); + boolean found = false; + for (ContainerStatus containerStatus:applicationAttempt + .getJustFinishedContainers()) { + if (cs2.getContainerId().equals(containerStatus.getContainerId())) { + found = true; + } + } + assertTrue(found); } @@ -1322,8 +1416,9 @@ public class TestRMAppAttemptTransitions { ContainerStatus.newInstance(amContainer.getId(), ContainerState.COMPLETE, "some error", 123); ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId(); + NodeId anyNodeId = NodeId.newInstance("host", 1234); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( - appAttemptId, cs1)); + appAttemptId, cs1, anyNodeId)); assertEquals(YarnApplicationAttemptState.RUNNING, applicationAttempt.createApplicationAttemptState()); sendAttemptUpdateSavedEvent(applicationAttempt);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a641496/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index b3dc35f..a0d5d84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -161,7 +161,7 @@ public class TestAMRMTokens { .getEventHandler() .handle( new RMAppAttemptContainerFinishedEvent(applicationAttemptId, - containerStatus)); + containerStatus, nm1.getNodeId())); // Make sure the RMAppAttempt is at Finished State. // Both AMRMToken and ClientToAMToken have been removed.