Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Aug 19 23:49:39 2014 @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -75,17 +76,18 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -94,6 +96,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -113,7 +116,7 @@ public class TestRMAppAttemptTransitions private static final String EMPTY_DIAGNOSTICS = ""; private static final String RM_WEBAPP_ADDR = - WebAppUtils.getResolvedRMWebAppURLWithoutScheme(new Configuration()); + WebAppUtils.getResolvedRMWebAppURLWithScheme(new Configuration()); private boolean isSecurityEnabled; private RMContext rmContext; @@ -130,7 +133,8 @@ public class TestRMAppAttemptTransitions private RMAppAttempt applicationAttempt; private Configuration conf = new Configuration(); - private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf)); + private AMRMTokenSecretManager amRMTokenManager = + spy(new AMRMTokenSecretManager(conf, rmContext)); private ClientToAMTokenSecretManagerInRM clientToAMTokenManager = spy(new ClientToAMTokenSecretManagerInRM()); private NMTokenSecretManagerInRM nmTokenManager = @@ -221,6 +225,8 @@ public class TestRMAppAttemptTransitions amLivelinessMonitor = mock(AMLivelinessMonitor.class); amFinishingMonitor = mock(AMLivelinessMonitor.class); writer = mock(RMApplicationHistoryWriter.class); + MasterKeyData masterKeyData = amRMTokenManager.createNewMasterKey(); + when(amRMTokenManager.getMasterKey()).thenReturn(masterKeyData); rmContext = new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, @@ -315,7 +321,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); assertNotNull(applicationAttempt.getTrackingUrl()); assertFalse("N/A".equals(applicationAttempt.getTrackingUrl())); @@ -331,7 +337,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); if (UserGroupInformation.isSecurityEnabled()) { verify(clientToAMTokenManager).createMasterKey( @@ -341,7 +347,6 @@ public class TestRMAppAttemptTransitions assertNull(applicationAttempt.createClientToken("some client")); } assertNull(applicationAttempt.createClientToken(null)); - assertNotNull(applicationAttempt.getAMRMToken()); // Check events verify(masterService). registerAppAttempt(applicationAttempt.getAppAttemptId()); @@ -359,7 +364,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); // Check events @@ -385,7 +390,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyAttemptFinalStateSaved(); @@ -425,7 +430,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertNull(applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); assertNull(applicationAttempt.getFinalApplicationStatus()); } @@ -437,7 +442,6 @@ public class TestRMAppAttemptTransitions assertEquals(RMAppAttemptState.ALLOCATED, applicationAttempt.getAppAttemptState()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - // Check events verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); verify(scheduler, times(2)). @@ -461,7 +465,7 @@ public class TestRMAppAttemptTransitions assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); // Check events verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); @@ -564,15 +568,15 @@ public class TestRMAppAttemptTransitions submitApplicationAttempt(); applicationAttempt.handle( new RMAppAttemptEvent( - applicationAttempt.getAppAttemptId(), + applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.ATTEMPT_ADDED)); if(unmanagedAM){ assertEquals(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, applicationAttempt.getAppAttemptState()); applicationAttempt.handle( - new RMAppAttemptNewSavedEvent( - applicationAttempt.getAppAttemptId(), null)); + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } testAppAttemptScheduledState(); @@ -599,6 +603,9 @@ public class TestRMAppAttemptTransitions any(List.class), any(List.class))). thenReturn(allocation); + RMContainer rmContainer = mock(RMContainerImpl.class); + when(scheduler.getRMContainer(container.getId())). + thenReturn(rmContainer); applicationAttempt.handle( new RMAppAttemptContainerAllocatedEvent( @@ -607,8 +614,8 @@ public class TestRMAppAttemptTransitions assertEquals(RMAppAttemptState.ALLOCATED_SAVING, applicationAttempt.getAppAttemptState()); applicationAttempt.handle( - new RMAppAttemptNewSavedEvent( - applicationAttempt.getAppAttemptId(), null)); + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); testAppAttemptAllocatedState(container); @@ -666,8 +673,10 @@ public class TestRMAppAttemptTransitions runApplicationAttempt(null, "host", 8042, url, true); // complete a container - applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( - applicationAttempt.getAppAttemptId(), mock(Container.class))); + Container container = mock(Container.class); + when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234)); + application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(), + container.getNodeId())); applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); // complete AM @@ -685,8 +694,8 @@ public class TestRMAppAttemptTransitions assertEquals(RMAppAttemptState.FINAL_SAVING, applicationAttempt.getAppAttemptState()); applicationAttempt.handle( - new RMAppAttemptUpdateSavedEvent( - applicationAttempt.getAppAttemptId(), null)); + new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } @Test @@ -771,6 +780,32 @@ public class TestRMAppAttemptTransitions } @Test + public void testAMCrashAtScheduled() { + // This is to test sending CONTAINER_FINISHED event at SCHEDULED state. + // Verify the state transition is correct. + scheduleApplicationAttempt(); + ContainerStatus cs = + SchedulerUtils.createAbnormalContainerStatus( + BuilderUtils.newContainerId( + applicationAttempt.getAppAttemptId(), 1), + SchedulerUtils.LOST_CONTAINER); + // send CONTAINER_FINISHED event at SCHEDULED state, + // The state should be FINAL_SAVING with previous state SCHEDULED + applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( + applicationAttempt.getAppAttemptId(), cs)); + // createApplicationAttemptState will return previous state (SCHEDULED), + // if the current state is FINAL_SAVING. + assertEquals(YarnApplicationAttemptState.SCHEDULED, + applicationAttempt.createApplicationAttemptState()); + // send ATTEMPT_UPDATE_SAVED event, + // verify the state is changed to state FAILED. + sendAttemptUpdateSavedEvent(applicationAttempt); + assertEquals(RMAppAttemptState.FAILED, + applicationAttempt.getAppAttemptState()); + verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); + } + + @Test public void testAllocatedToKilled() { Container amContainer = allocateApplicationAttempt(); applicationAttempt.handle( @@ -812,6 +847,9 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); + boolean shouldCheckURL = (applicationAttempt.getTrackingUrl() != null); + verifyAMCrashAtAllocatedDiagnosticInfo(applicationAttempt.getDiagnostics(), + exitCode, shouldCheckURL); } @Test @@ -845,7 +883,7 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); @@ -882,7 +920,7 @@ public class TestRMAppAttemptTransitions applicationAttempt.getAppAttemptState()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(amContainer, applicationAttempt.getMasterContainer()); - assertEquals(0, applicationAttempt.getRanNodes().size()); + assertEquals(0, application.getRanNodes().size()); String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); @@ -1229,6 +1267,20 @@ public class TestRMAppAttemptTransitions verifyApplicationAttemptFinished(RMAppAttemptState.FAILED); } + private void verifyAMCrashAtAllocatedDiagnosticInfo(String diagnostics, + int exitCode, boolean shouldCheckURL) { + assertTrue("Diagnostic information does not point the logs to the users", + diagnostics.contains("logs")); + assertTrue("Diagnostic information does not contain application attempt id", + diagnostics.contains(applicationAttempt.getAppAttemptId().toString())); + assertTrue("Diagnostic information does not contain application exit code", + diagnostics.contains("exitCode: " + exitCode)); + if (shouldCheckURL) { + assertTrue("Diagnostic information does not contain application proxy URL", + diagnostics.contains(applicationAttempt.getWebProxyBase())); + } + } + private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) { verify(amRMTokenManager, times(count)).applicationMasterFinished(appAttemptId); if (UserGroupInformation.isSecurityEnabled()) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java Tue Aug 19 23:49:39 2014 @@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -204,4 +214,36 @@ public class TestRMContainerImpl { assertEquals(RMContainerState.RUNNING, rmContainer.getState()); verify(writer, never()).containerFinished(any(RMContainer.class)); } + + @Test + public void testExistenceOfResourceRequestInRMContainer() throws Exception { + Configuration conf = new Configuration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // Verify whether list of ResourceRequest is present in RMContainer + // while moving to ALLOCATED state + Assert.assertNotNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + + // Allocate container + am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()) + .getAllocatedContainers(); + rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED); + + // After RMContainer moving to ACQUIRED state, list of ResourceRequest will + // be empty + Assert.assertNull(scheduler.getRMContainer(containerId2) + .getResourceRequests()); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -61,10 +62,15 @@ public class TestSchedulerApplicationAtt QueueMetrics newMetrics = newQueue.getMetrics(); ApplicationAttemptId appAttId = createAppAttemptId(0, 0); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getEpoch()).thenReturn(3); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, - user, oldQueue, oldQueue.getActiveUsersManager(), null); + user, oldQueue, oldQueue.getActiveUsersManager(), rmContext); oldMetrics.submitApp(user); + // confirm that containerId is calculated based on epoch. + assertEquals(app.getNewContainerId(), 0x00c00001); + // Resource request Resource requestedResource = Resource.newInstance(1536, 2); Priority requestedPriority = Priority.newInstance(2); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java Tue Aug 19 23:49:39 2014 @@ -384,15 +384,18 @@ public class TestSchedulerUtils { Assert.assertEquals(ContainerExitStatus.PREEMPTED, cd.getExitStatus()); } - public static <T> SchedulerApplication verifyAppAddedAndRemovedFromScheduler( - final Map<ApplicationId, SchedulerApplication> applications, - EventHandler<SchedulerEvent> handler, String queueName) throws Exception { + public static SchedulerApplication<SchedulerApplicationAttempt> + verifyAppAddedAndRemovedFromScheduler( + Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications, + EventHandler<SchedulerEvent> handler, String queueName) + throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(appId, queueName, "user"); handler.handle(appAddedEvent); - SchedulerApplication app = applications.get(appId); + SchedulerApplication<SchedulerApplicationAttempt> app = + applications.get(appId); // verify application is added. Assert.assertNotNull(app); Assert.assertEquals("user", app.getUser()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Tue Aug 19 23:49:39 2014 @@ -81,7 +81,7 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); @@ -165,7 +165,7 @@ public class TestApplicationLimits { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); CSQueue root = @@ -478,7 +478,7 @@ public class TestApplicationLimits { // Say cluster has 100 nodes of 16G each Resource clusterResource = Resources.createResource(100 * 16 * GB); - when(csContext.getClusterResources()).thenReturn(clusterResource); + when(csContext.getClusterResource()).thenReturn(clusterResource); Map<String, CSQueue> queues = new HashMap<String, CSQueue>(); CapacityScheduler.parseQueue(csContext, csConf, null, "root", Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Tue Aug 19 23:49:39 2014 @@ -25,22 +25,37 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; - -import org.junit.Assert; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.LocalConfigurationProvider; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -48,18 +63,36 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; +import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -69,9 +102,14 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -119,13 +157,16 @@ public class TestCapacityScheduler { @After public void tearDown() throws Exception { - resourceManager.stop(); + if (resourceManager != null) { + resourceManager.stop(); + } } @Test (timeout = 30000) public void testConfValidation() throws Exception { ResourceScheduler scheduler = new CapacityScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); @@ -340,18 +381,23 @@ public class TestCapacityScheduler { public void testRefreshQueues() throws Exception { CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); cs.reinitialize(conf, mockContext); checkQueueCapacities(cs, 80f, 20f); + cs.stop(); } private void checkQueueCapacities(CapacityScheduler cs, @@ -454,6 +500,9 @@ public class TestCapacityScheduler { setupQueueConfiguration(csConf); CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(csConf); + cs.start(); cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(csConf), new NMTokenSecretManagerInRM(csConf), @@ -465,14 +514,15 @@ public class TestCapacityScheduler { cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n2)); - Assert.assertEquals(6 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(6 * GB, cs.getClusterResource().getMemory()); // reconnect n1 with downgraded memory n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 1); cs.handle(new NodeRemovedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n1)); - Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); + Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory()); + cs.stop(); } @Test @@ -481,6 +531,9 @@ public class TestCapacityScheduler { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), @@ -511,6 +564,7 @@ public class TestCapacityScheduler { assertEquals(queueB, queueB4.getParent()); } finally { B3_CAPACITY += B4_CAPACITY; + cs.stop(); } } @Test @@ -627,17 +681,17 @@ public class TestCapacityScheduler { @Test public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { - - AsyncDispatcher rmDispatcher = new AsyncDispatcher(); - CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); - cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, - null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + @SuppressWarnings("unchecked") + AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode> cs = + (AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>) rm + .getResourceScheduler(); - SchedulerApplication app = + SchedulerApplication<SchedulerApplicationAttempt> app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( cs.getSchedulerApplications(), cs, "a1"); Assert.assertEquals("a1", app.getQueue().getQueueName()); @@ -667,5 +721,1173 @@ public class TestCapacityScheduler { CapacityScheduler.schedule(cs); } } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + private void waitForAppPreemptionInfo(RMApp app, Resource preempted, + int numAMPreempted, int numTaskPreempted, + Resource currentAttemptPreempted, boolean currentAttemptAMPreempted, + int numLatestAttemptTaskPreempted) throws InterruptedException { + while (true) { + RMAppMetrics appPM = app.getRMAppMetrics(); + RMAppAttemptMetrics attemptPM = + app.getCurrentAppAttempt().getRMAppAttemptMetrics(); + + if (appPM.getResourcePreempted().equals(preempted) + && appPM.getNumAMContainersPreempted() == numAMPreempted + && appPM.getNumNonAMContainersPreempted() == numTaskPreempted + && attemptPM.getResourcePreempted().equals(currentAttemptPreempted) + && app.getCurrentAppAttempt().getRMAppAttemptMetrics() + .getIsPreempted() == currentAttemptAMPreempted + && attemptPM.getNumNonAMContainersPreempted() == + numLatestAttemptTaskPreempted) { + return; + } + Thread.sleep(500); + } + } + + private void waitForNewAttemptCreated(RMApp app, + ApplicationAttemptId previousAttemptId) throws InterruptedException { + while (app.getCurrentAppAttempt().equals(previousAttemptId)) { + Thread.sleep(500); + } + } + + @Test(timeout = 30000) + public void testAllocateDoesNotBlockOnSchedulerLock() throws Exception { + final YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MyContainerManager containerManager = new MyContainerManager(); + final MockRMWithAMS rm = + new MockRMWithAMS(conf, containerManager); + rm.start(); + + MockNM nm1 = rm.registerNode("localhost:1234", 5120); + + Map<ApplicationAccessType, String> acls = + new HashMap<ApplicationAccessType, String>(2); + acls.put(ApplicationAccessType.VIEW_APP, "*"); + RMApp app = rm.submitApp(1024, "appname", "appuser", acls); + + nm1.nodeHeartbeat(true); + + RMAppAttempt attempt = app.getCurrentAppAttempt(); + ApplicationAttemptId applicationAttemptId = attempt.getAppAttemptId(); + int msecToWait = 10000; + int msecToSleep = 100; + while (attempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED + && msecToWait > 0) { + LOG.info("Waiting for AppAttempt to reach LAUNCHED state. " + + "Current state is " + attempt.getAppAttemptState()); + Thread.sleep(msecToSleep); + msecToWait -= msecToSleep; + } + Assert.assertEquals(attempt.getAppAttemptState(), + RMAppAttemptState.LAUNCHED); + + // Create a client to the RM. + final YarnRPC rpc = YarnRPC.create(conf); + + UserGroupInformation currentUser = + UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); + Credentials credentials = containerManager.getContainerCredentials(); + final InetSocketAddress rmBindAddress = + rm.getApplicationMasterService().getBindAddress(); + Token<? extends TokenIdentifier> amRMToken = + MockRMWithAMS.setupAndReturnAMRMToken(rmBindAddress, + credentials.getAllTokens()); + currentUser.addToken(amRMToken); + ApplicationMasterProtocol client = + currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { + @Override + public ApplicationMasterProtocol run() { + return (ApplicationMasterProtocol) rpc.getProxy( + ApplicationMasterProtocol.class, rmBindAddress, conf); + } + }); + + RegisterApplicationMasterRequest request = + RegisterApplicationMasterRequest.newInstance("localhost", 12345, ""); + client.registerApplicationMaster(request); + + // grab the scheduler lock from another thread + // and verify an allocate call in this thread doesn't block on it + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CyclicBarrier barrier = new CyclicBarrier(2); + Thread otherThread = new Thread(new Runnable() { + @Override + public void run() { + synchronized(cs) { + try { + barrier.await(); + barrier.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + } + }); + otherThread.start(); + barrier.await(); + AllocateRequest allocateRequest = + AllocateRequest.newInstance(0, 0.0f, null, null, null); + client.allocate(allocateRequest); + barrier.await(); + otherThread.join(); + + rm.stop(); + } + + @Test + public void testNumClusterNodes() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + CapacityScheduler cs = new CapacityScheduler(); + cs.setConf(conf); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.setRMContext(rmContext); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + cs.init(csConf); + cs.start(); + assertEquals(0, cs.getNumClusterNodes()); + + RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1); + RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(2 * GB), 2); + cs.handle(new NodeAddedSchedulerEvent(n1)); + cs.handle(new NodeAddedSchedulerEvent(n2)); + assertEquals(2, cs.getNumClusterNodes()); + + cs.handle(new NodeRemovedSchedulerEvent(n1)); + assertEquals(1, cs.getNumClusterNodes()); + cs.handle(new NodeAddedSchedulerEvent(n1)); + assertEquals(2, cs.getNumClusterNodes()); + cs.handle(new NodeRemovedSchedulerEvent(n2)); + cs.handle(new NodeRemovedSchedulerEvent(n1)); + assertEquals(0, cs.getNumClusterNodes()); + + cs.stop(); + } + + @Test(timeout = 120000) + public void testPreemptionInfo() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + int CONTAINER_MEMORY = 1024; // start RM + MockRM rm1 = new MockRM(conf); + rm1.start(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // start NM + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = rm1.submitApp(CONTAINER_MEMORY); + MockAM am0 = launchAM(app0, rm1, nm1); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = + cs.getSchedulerApplications().get(app0.getApplicationId()) + .getCurrentAppAttempt(); + + // allocate some containers and launch them + List<Container> allocatedContainers = + am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); + + // kill the 3 containers + for (Container c : allocatedContainers) { + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + } + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); + + // kill app0-attempt0 AM container + cs.killContainer(schedulerAppAttempt.getRMContainer(app0 + .getCurrentAppAttempt().getMasterContainer().getId())); + + // wait for app0 failed + waitForNewAttemptCreated(app0, am0.getApplicationAttemptId()); + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3, + Resource.newInstance(0, 0), false, 0); + + // launch app0-attempt1 + MockAM am1 = launchAM(app0, rm1, nm1); + schedulerAppAttempt = + cs.getSchedulerApplications().get(app0.getApplicationId()) + .getCurrentAppAttempt(); + + // allocate some containers and launch them + allocatedContainers = + am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); + for (Container c : allocatedContainers) { + cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + } + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 7, 7), 1, 6, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); + + rm1.stop(); + } + + @Test(timeout = 30000) + public void testRecoverRequestAfterPreemption() throws Exception { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000); + RMApp app1 = rm1.submitApp(1024); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // request a container. + am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>()); + ContainerId containerId1 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED); + + RMContainer rmContainer = cs.getRMContainer(containerId1); + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + FiCaSchedulerApp app = cs.getApplicationAttempt(am1 + .getApplicationAttemptId()); + + FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode()); + for (ResourceRequest request : requests) { + // Skip the OffRack and RackLocal resource requests. + if (request.getResourceName().equals(node.getRackName()) + || request.getResourceName().equals(ResourceRequest.ANY)) { + continue; + } + + // Already the node local resource request is cleared from RM after + // allocation. + Assert.assertNull(app.getResourceRequest(request.getPriority(), + request.getResourceName())); + } + + // Call killContainer to preempt the container + cs.killContainer(rmContainer); + + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + // Resource request must have added back in RM after preempt event + // handling. + Assert.assertEquals( + 1, + app.getResourceRequest(request.getPriority(), + request.getResourceName()).getNumContainers()); + } + + // New container will be allocated and will move to ALLOCATED state + ContainerId containerId2 = ContainerId.newInstance( + am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED); + + // allocate container + List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(), + new ArrayList<ContainerId>()).getAllocatedContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } + + private MockRM setUpMove() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + return rm; + } + + @Test + public void testMoveAppBasic() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "b1"); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAppSameParent() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2"); + assertTrue(appsInA2.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "a2"); + + // check postconditions + appsInA2 = scheduler.getAppsInQueue("a2"); + assertEquals(1, appsInA2.size()); + queue = + scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a2")); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + + @Test + public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=2G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + // available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% + // total cap) + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + // 2GB 1C + Task task_1_1 = + new Task(application_1, priority_0, + new String[] { ResourceRequest.ANY }); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0, host_1 }); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + // prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + + } + + @Test + public void testMoveAppSuccess() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + scheduler.moveApplication(application_0.getApplicationId(), "b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + StringBuilder qState = new StringBuilder(); + qState.append(CapacitySchedulerConfiguration.PREFIX).append(B) + .append(CapacitySchedulerConfiguration.DOT) + .append(CapacitySchedulerConfiguration.STATE); + csConf.set(qState.toString(), QueueState.STOPPED.name()); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + } + + @Test + public void testMoveAppQueueMetricsCheck() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + CSQueue origRootQ = cs.getRootQueue(); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); + int origNumAppsRoot = origRootQ.getNumApplications(); + + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + + CSQueue newRootQ = cs.getRootQueue(); + int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); + int newNumAppsRoot = newRootQ.getNumApplications(); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerLeafQueueInfo origOldA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo origNewA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetOldA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetNewA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues()); + // originally submitted here + assertEquals(1, origOldA1.getNumApplications()); + assertEquals(1, origNumAppsA); + assertEquals(2, origNumAppsRoot); + // after the move + assertEquals(0, origNewA1.getNumApplications()); + assertEquals(1, newNumAppsA); + assertEquals(2, newNumAppsRoot); + // original consumption on a1 + assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory()); + assertEquals(1, origOldA1.getResourcesUsed().getvCores()); + assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move + assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move + // app moved here with live containers + assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory()); + assertEquals(1, targetNewA2.getResourcesUsed().getvCores()); + // it was empty before the move + assertEquals(0, targetOldA2.getNumApplications()); + assertEquals(0, targetOldA2.getResourcesUsed().getMemory()); + assertEquals(0, targetOldA2.getResourcesUsed().getvCores()); + // after the app moved here + assertEquals(1, targetNewA2.getNumApplications()); + // 1 container on original queue before move + assertEquals(1, origOldA1.getNumContainers()); + // after the move the resource released + assertEquals(0, origNewA1.getNumContainers()); + // and moved to the new queue + assertEquals(1, targetNewA2.getNumContainers()); + // which originally didn't have any + assertEquals(0, targetOldA2.getNumContainers()); + // 1 user with 3GB + assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + // user ha no more running app in the orig queue + assertEquals(0, origNewA1.getUsers().getUsersList().size()); + // 1 user with 3GB + assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + } + + private int getNumAppsInQueue(String name, List<CSQueue> queues) { + for (CSQueue queue : queues) { + if (queue.getQueueName().equals(name)) { + return queue.getNumApplications(); + } + } + return -1; + } + + private CapacitySchedulerQueueInfo getQueueInfo(String name, + CapacitySchedulerQueueInfoList info) { + if (info != null) { + for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) { + if (queueInfo.getQueueName().equals(name)) { + return queueInfo; + } else { + CapacitySchedulerQueueInfo result = + getQueueInfo(name, queueInfo.getQueues()); + if (result == null) { + continue; + } + return result; + } + } + } + return null; + } + + @Test + public void testMoveAllApps() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // check postconditions + Thread.sleep(1000); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("a1", "DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("DOES_NOT_EXIST", "b1"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInQueue() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + scheduler.killAllAppsInQueue("a1"); + + // check postconditions + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.isEmpty()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + try { + scheduler.killAllAppsInQueue("DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java Tue Aug 19 23:49:39 2014 @@ -89,7 +89,7 @@ public class TestChildQueueOrder { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); - when(csContext.getClusterResources()). + when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java Tue Aug 19 23:49:39 2014 @@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -31,6 +29,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; @@ -48,6 +47,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -209,10 +209,11 @@ public class TestContainerAllocation { @Override public Token createContainerToken(ContainerId containerId, - NodeId nodeId, String appSubmitter, Resource capability) { + NodeId nodeId, String appSubmitter, Resource capability, + Priority priority, long createTime) { numRetries++; return super.createContainerToken(containerId, nodeId, appSubmitter, - capability); + capability, priority, createTime); } }; }
