Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Wed Aug 20 01:34:29 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -68,6 +69,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -100,6 +102,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; @@ -1014,4 +1020,874 @@ public class TestCapacityScheduler { // Now with updated ResourceRequest, a container is allocated for AM. Assert.assertTrue(containers.size() == 1); } + + private MockRM setUpMove() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + return rm; + } + + @Test + public void testMoveAppBasic() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "b1"); + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAppSameParent() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInA2 = scheduler.getAppsInQueue("a2"); + assertTrue(appsInA2.isEmpty()); + + // now move the app + scheduler.moveApplication(app.getApplicationId(), "a2"); + + // check postconditions + appsInA2 = scheduler.getAppsInQueue("a2"); + assertEquals(1, appsInA2.size()); + queue = + scheduler.getApplicationAttempt(appsInA2.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a2")); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + + @Test + public void testMoveAppForMoveToQueueWithFreeCap() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(4 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // task_0_0 task_1_0 allocated, used=2G + nodeUpdate(nm_0); + + // nothing allocated + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(1 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + checkNodeResourceUsage(2 * GB, nm_0); // task_0_0 (1G) and task_1_0 (1G) 2G + // available + checkNodeResourceUsage(0 * GB, nm_1); // no tasks, 2G available + + // move app from a1(30% cap of total 10.5% cap) to b1(79,2% cap of 89,5% + // total cap) + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + // 2GB 1C + Task task_1_1 = + new Task(application_1, priority_0, + new String[] { ResourceRequest.ANY }); + application_1.addTask(task_1_1); + + application_1.schedule(); + + // 2GB 1C + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0, host_1 }); + application_0.addTask(task_0_1); + + application_0.schedule(); + + // prev 2G used free 2G + nodeUpdate(nm_0); + + // prev 0G used free 2G + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_1.schedule(); + checkApplicationResourceUsage(3 * GB, application_1); + + // Get allocations from the scheduler + application_0.schedule(); + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(2 * GB, nm_1); + + } + + @Test + public void testMoveAppSuccess() throws Exception { + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + // b2 can only run 1 app at a time + scheduler.moveApplication(application_0.getApplicationId(), "b2"); + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(0 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + // lets move application_0 to a queue where it can run + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + application_0.schedule(); + + nodeUpdate(nm_1); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(1 * GB, nm_0); + checkNodeResourceUsage(3 * GB, nm_1); + + } + + @Test(expected = YarnException.class) + public void testMoveAppViolateQueueState() throws Exception { + + resourceManager = new ResourceManager(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + StringBuilder qState = new StringBuilder(); + qState.append(CapacitySchedulerConfiguration.PREFIX).append(B) + .append(CapacitySchedulerConfiguration.DOT) + .append(CapacitySchedulerConfiguration.STATE); + csConf.set(qState.toString(), QueueState.STOPPED.name()); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + mockContext = mock(RMContext.class); + when(mockContext.getConfigurationProvider()).thenReturn( + new LocalConfigurationProvider()); + + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(6 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + + // task_0_0 allocated + nodeUpdate(nm_0); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + checkNodeResourceUsage(3 * GB, nm_0); + // b2 queue contains 3GB consumption app, + // add another 3GB will hit max capacity limit on queue b + scheduler.moveApplication(application_0.getApplicationId(), "b1"); + + } + + @Test + public void testMoveAppQueueMetricsCheck() throws Exception { + ResourceScheduler scheduler = resourceManager.getResourceScheduler(); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // Register node2 + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * GB, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit application_0 + Application application_0 = + new Application("user_0", "a1", resourceManager); + application_0.submit(); // app + app attempt event sent to scheduler + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(3 * GB, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * GB, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0, host_1 }); + application_0.addTask(task_0_0); + + // Submit application_1 + Application application_1 = + new Application("user_1", "b2", resourceManager); + application_1.submit(); // app + app attempt event sent to scheduler + + application_1.addNodeManager(host_0, 1234, nm_0); + application_1.addNodeManager(host_1, 1234, nm_1); + + Resource capability_1_0 = Resources.createResource(1 * GB, 1); + application_1.addResourceRequestSpec(priority_1, capability_1_0); + + Resource capability_1_1 = Resources.createResource(2 * GB, 1); + application_1.addResourceRequestSpec(priority_0, capability_1_1); + + Task task_1_0 = + new Task(application_1, priority_1, new String[] { host_0, host_1 }); + application_1.addTask(task_1_0); + + // Send resource requests to the scheduler + application_0.schedule(); // allocate + application_1.schedule(); // allocate + + nodeUpdate(nm_0); + + nodeUpdate(nm_1); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + CSQueue origRootQ = cs.getRootQueue(); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); + int origNumAppsRoot = origRootQ.getNumApplications(); + + scheduler.moveApplication(application_0.getApplicationId(), "a2"); + + CSQueue newRootQ = cs.getRootQueue(); + int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); + int newNumAppsRoot = newRootQ.getNumApplications(); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerLeafQueueInfo origOldA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo origNewA1 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", newInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetOldA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", oldInfo.getQueues()); + CapacitySchedulerLeafQueueInfo targetNewA2 = + (CapacitySchedulerLeafQueueInfo) getQueueInfo("a2", newInfo.getQueues()); + // originally submitted here + assertEquals(1, origOldA1.getNumApplications()); + assertEquals(1, origNumAppsA); + assertEquals(2, origNumAppsRoot); + // after the move + assertEquals(0, origNewA1.getNumApplications()); + assertEquals(1, newNumAppsA); + assertEquals(2, newNumAppsRoot); + // original consumption on a1 + assertEquals(3 * GB, origOldA1.getResourcesUsed().getMemory()); + assertEquals(1, origOldA1.getResourcesUsed().getvCores()); + assertEquals(0, origNewA1.getResourcesUsed().getMemory()); // after the move + assertEquals(0, origNewA1.getResourcesUsed().getvCores()); // after the move + // app moved here with live containers + assertEquals(3 * GB, targetNewA2.getResourcesUsed().getMemory()); + assertEquals(1, targetNewA2.getResourcesUsed().getvCores()); + // it was empty before the move + assertEquals(0, targetOldA2.getNumApplications()); + assertEquals(0, targetOldA2.getResourcesUsed().getMemory()); + assertEquals(0, targetOldA2.getResourcesUsed().getvCores()); + // after the app moved here + assertEquals(1, targetNewA2.getNumApplications()); + // 1 container on original queue before move + assertEquals(1, origOldA1.getNumContainers()); + // after the move the resource released + assertEquals(0, origNewA1.getNumContainers()); + // and moved to the new queue + assertEquals(1, targetNewA2.getNumContainers()); + // which originally didn't have any + assertEquals(0, targetOldA2.getNumContainers()); + // 1 user with 3GB + assertEquals(3 * GB, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, origOldA1.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + // user ha no more running app in the orig queue + assertEquals(0, origNewA1.getUsers().getUsersList().size()); + // 1 user with 3GB + assertEquals(3 * GB, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getMemory()); + // 1 user with 1 core + assertEquals(1, targetNewA2.getUsers().getUsersList().get(0) + .getResourcesUsed().getvCores()); + + // Get allocations from the scheduler + application_0.schedule(); // task_0_0 + checkApplicationResourceUsage(3 * GB, application_0); + + application_1.schedule(); // task_1_0 + checkApplicationResourceUsage(1 * GB, application_1); + + // task_1_0 (1G) application_0 moved to b2 with max running app 1 so it is + // not scheduled + checkNodeResourceUsage(4 * GB, nm_0); + checkNodeResourceUsage(0 * GB, nm_1); + + } + + private int getNumAppsInQueue(String name, List<CSQueue> queues) { + for (CSQueue queue : queues) { + if (queue.getQueueName().equals(name)) { + return queue.getNumApplications(); + } + } + return -1; + } + + private CapacitySchedulerQueueInfo getQueueInfo(String name, + CapacitySchedulerQueueInfoList info) { + if (info != null) { + for (CapacitySchedulerQueueInfo queueInfo : info.getQueueInfoList()) { + if (queueInfo.getQueueName().equals(name)) { + return queueInfo; + } else { + CapacitySchedulerQueueInfo result = + getQueueInfo(name, queueInfo.getQueues()); + if (result == null) { + continue; + } + return result; + } + } + } + return null; + } + + @Test + public void testMoveAllApps() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + // check postconditions + Thread.sleep(1000); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("b1")); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(1, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("a1", "DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testMoveAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + // now move the app + try { + scheduler.moveAllApps("DOES_NOT_EXIST", "b1"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInQueue() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertTrue(queue.equals("a1")); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + scheduler.killAllAppsInQueue("a1"); + + // check postconditions + rm.waitForState(app.getApplicationId(), RMAppState.KILLED); + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.isEmpty()); + + appsInA1 = scheduler.getAppsInQueue("a1"); + assertTrue(appsInA1.isEmpty()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.isEmpty()); + + rm.stop(); + } + + @Test + public void testKillAllAppsInvalidSource() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + // check preconditions + List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + // now kill the app + try { + scheduler.killAllAppsInQueue("DOES_NOT_EXIST"); + Assert.fail(); + } catch (YarnException e) { + // expected + } + + // check postconditions, app should still be in a1 + appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(1, appsInA1.size()); + + appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(1, appsInA.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(1, appsInRoot.size()); + + rm.stop(); + } + }
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Wed Aug 20 01:34:29 2014 @@ -86,13 +86,12 @@ public class TestUtils { Configuration conf = new Configuration(); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); - RMContext rmContext = + RMContextImpl rmContext = new RMContextImpl(nullDispatcher, cae, null, null, null, - new AMRMTokenSecretManager(conf), + new AMRMTokenSecretManager(conf, null), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), writer); - return rmContext; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Wed Aug 20 01:34:29 2014 @@ -28,10 +28,11 @@ import org.apache.hadoop.yarn.util.resou /** * Dummy implementation of Schedulable for unit testing. */ -public class FakeSchedulable extends Schedulable { +public class FakeSchedulable implements Schedulable { private Resource usage; private Resource minShare; private Resource maxShare; + private Resource fairShare; private ResourceWeights weights; private Priority priority; private long startTime; @@ -90,6 +91,21 @@ public class FakeSchedulable extends Sch } @Override + public Resource getFairShare() { + return this.fairShare; + } + + @Override + public void setFairShare(Resource fairShare) { + this.fairShare = fairShare; + } + + @Override + public boolean isActive() { + return true; + } + + @Override public Resource getDemand() { return null; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java Wed Aug 20 01:34:29 2014 @@ -62,7 +62,7 @@ public class TestFSLeafQueue { @Test public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); schedulable.addAppSchedulable(app); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Wed Aug 20 01:34:29 2014 @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -56,7 +58,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -80,13 +81,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -290,6 +289,7 @@ public class TestFairScheduler extends F // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "queue1", "user1"); createSchedulingRequest(10 * 1024, "queue2", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -320,6 +320,7 @@ public class TestFairScheduler extends F // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "parent.queue2", "user1"); createSchedulingRequest(10 * 1024, "parent.queue3", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -764,8 +765,10 @@ public class TestFairScheduler extends F scheduler.handle(nodeEvent1); // user1,user2 submit their apps to parentq and create user queues - scheduler.assignToQueue(rmApp1, "root.parentq", "user1"); - scheduler.assignToQueue(rmApp2, "root.parentq", "user2"); + createSchedulingRequest(10 * 1024, "root.parentq", "user1"); + createSchedulingRequest(10 * 1024, "root.parentq", "user2"); + // user3 submits app in default queue + createSchedulingRequest(10 * 1024, "root.default", "user3"); scheduler.update(); @@ -1285,7 +1288,7 @@ public class TestFairScheduler extends F scheduler.update(); Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() .getLeafQueue("queueA.queueA2", false), clock.getTime()); - assertEquals(2980, toPreempt.getMemory()); + assertEquals(3277, toPreempt.getMemory()); // verify if the 3 containers required by queueA2 are preempted in the same // round @@ -1533,7 +1536,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1607,9 +1610,9 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @@ -1682,8 +1685,8 @@ public class TestFairScheduler extends F "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1725,7 +1728,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1760,7 +1763,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(0, 1, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1824,10 +1827,10 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1946,7 +1949,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -2019,7 +2022,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2060,7 +2063,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2095,7 +2098,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -2137,7 +2140,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2159,10 +2162,10 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2202,13 +2205,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2241,19 +2244,19 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterResource()); @@ -2335,7 +2338,7 @@ public class TestFairScheduler extends F NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2347,14 +2350,14 @@ public class TestFairScheduler extends F } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); - Collection<AppSchedulable> runnableApps = + Collection<FSAppAttempt> runnableApps = queue.getRunnableAppSchedulables(); - Collection<AppSchedulable> nonRunnableApps = + Collection<FSAppAttempt> nonRunnableApps = queue.getNonRunnableAppSchedulables(); - assertEquals(runnable, runnableApps.contains(app.getAppSchedulable())); - assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable())); + assertEquals(runnable, runnableApps.contains(app)); + assertEquals(!runnable, nonRunnableApps.contains(app)); } private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, @@ -2444,8 +2447,12 @@ public class TestFairScheduler extends F scheduler.update(); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); - assertEquals("Queue queue1's fair share should be 10240", - 10240, queue1.getFairShare().getMemory()); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); + + createSchedulingRequest(1 * 1024, "root.default", "user1"); + scheduler.update(); + scheduler.handle(updateEvent); Resource amResource1 = Resource.newInstance(1024, 1); Resource amResource2 = Resource.newInstance(2048, 2); @@ -2455,7 +2462,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 1024 MB memory", @@ -2469,7 +2476,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 1024 MB memory", @@ -2483,7 +2490,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId3 = createAppAttemptId(3, 1); createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application3's AM requests 1024 MB memory", @@ -2519,7 +2526,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId4 = createAppAttemptId(4, 1); createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application4's AM requests 2048 MB memory", @@ -2533,7 +2540,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId5 = createAppAttemptId(5, 1); createApplicationWithAMResource(attId5, "queue1", "user1", amResource2); createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5); - FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application5's AM requests 2048 MB memory", @@ -2576,7 +2583,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); - FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application6's AM should not be running", @@ -2633,24 +2640,32 @@ public class TestFairScheduler extends F FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); - assertEquals("Queue queue1's fair share should be 1366", - 1366, queue1.getFairShare().getMemory()); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); FSLeafQueue queue2 = scheduler.getQueueManager().getLeafQueue("queue2", true); - assertEquals("Queue queue2's fair share should be 1366", - 1366, queue2.getFairShare().getMemory()); + assertEquals("Queue queue2's fair share should be 0", 0, queue2 + .getFairShare().getMemory()); FSLeafQueue queue3 = scheduler.getQueueManager().getLeafQueue("queue3", true); - assertEquals("Queue queue3's fair share should be 1366", - 1366, queue3.getFairShare().getMemory()); + assertEquals("Queue queue3's fair share should be 0", 0, queue3 + .getFairShare().getMemory()); FSLeafQueue queue4 = scheduler.getQueueManager().getLeafQueue("queue4", true); - assertEquals("Queue queue4's fair share should be 1366", - 1366, queue4.getFairShare().getMemory()); + assertEquals("Queue queue4's fair share should be 0", 0, queue4 + .getFairShare().getMemory()); FSLeafQueue queue5 = scheduler.getQueueManager().getLeafQueue("queue5", true); - assertEquals("Queue queue5's fair share should be 1366", - 1366, queue5.getFairShare().getMemory()); + assertEquals("Queue queue5's fair share should be 0", 0, queue5 + .getFairShare().getMemory()); + + List<String> queues = Arrays.asList("root.default", "root.queue3", + "root.queue4", "root.queue5"); + for (String queue : queues) { + createSchedulingRequest(1 * 1024, queue, "user1"); + scheduler.update(); + scheduler.handle(updateEvent); + } Resource amResource1 = Resource.newInstance(2048, 1); int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); @@ -2659,7 +2674,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId1 = createAppAttemptId(1, 1); createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application1's AM requests 2048 MB memory", @@ -2673,7 +2688,7 @@ public class TestFairScheduler extends F ApplicationAttemptId attId2 = createAppAttemptId(2, 1); createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); scheduler.update(); scheduler.handle(updateEvent); assertEquals("Application2's AM requests 2048 MB memory", @@ -2805,7 +2820,7 @@ public class TestFairScheduler extends F // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2989,7 +3004,7 @@ public class TestFairScheduler extends F assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -3045,7 +3060,7 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -3153,12 +3168,10 @@ public class TestFairScheduler extends F assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); scheduler.moveApplication(appId, "queue2"); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertSame(targetQueue, app.getQueue()); - assertFalse(oldQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); @@ -3206,17 +3219,13 @@ public class TestFairScheduler extends F ApplicationAttemptId appAttId = createSchedulingRequest(1024, 1, "queue1", "user1", 3); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttId); - assertTrue(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); + assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app)); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); - assertFalse(oldQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertFalse(targetQueue.getNonRunnableAppSchedulables() - .contains(app.getAppSchedulable())); - assertTrue(targetQueue.getRunnableAppSchedulables() - .contains(app.getAppSchedulable())); + assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app)); + assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app)); + assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); } @@ -3341,4 +3350,38 @@ public class TestFairScheduler extends F scheduler.findLowestCommonAncestorQueue(a1Queue, b1Queue); assertEquals(ancestorQueue, queue1); } + + @Test + public void testThreadLifeCycle() throws InterruptedException { + conf.setBoolean( + FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); + scheduler.init(conf); + scheduler.start(); + + Thread updateThread = scheduler.updateThread; + Thread schedulingThread = scheduler.schedulingThread; + + assertTrue(updateThread.isAlive()); + assertTrue(schedulingThread.isAlive()); + + scheduler.stop(); + + int numRetries = 100; + while (numRetries-- > 0 && + (updateThread.isAlive() || schedulingThread.isAlive())) { + Thread.sleep(50); + } + + assertNotEquals("One of the threads is still alive", 0, numRetries); + } + + @Test + public void testPerfMetricsInited() { + scheduler.init(conf); + scheduler.start(); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + scheduler.fsOpDurations.getMetrics(collector, true); + assertEquals("Incorrect number of perf metrics", 1, + collector.getRecords().size()); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java Wed Aug 20 01:34:29 2014 @@ -42,12 +42,13 @@ public class TestMaxRunningAppsEnforcer private int appNum; private TestFairScheduler.MockClock clock; private RMContext rmContext; + private FairScheduler scheduler; @Before public void setup() throws Exception { Configuration conf = new Configuration(); clock = new TestFairScheduler.MockClock(); - FairScheduler scheduler = mock(FairScheduler.class); + scheduler = mock(FairScheduler.class); when(scheduler.getConf()).thenReturn( new FairSchedulerConfiguration(conf)); when(scheduler.getClock()).thenReturn(clock); @@ -65,11 +66,11 @@ public class TestMaxRunningAppsEnforcer when(rmContext.getEpoch()).thenReturn(0); } - private FSSchedulerApp addApp(FSLeafQueue queue, String user) { + private FSAppAttempt addApp(FSLeafQueue queue, String user) { ApplicationId appId = ApplicationId.newInstance(0l, appNum++); ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user); - FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, + FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null, rmContext); queue.addApp(app, runnable); if (runnable) { @@ -80,7 +81,7 @@ public class TestMaxRunningAppsEnforcer return app; } - private void removeApp(FSSchedulerApp app) { + private void removeApp(FSAppAttempt app) { app.getQueue().removeApp(app); maxAppsEnforcer.untrackRunnableApp(app); maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue()); @@ -93,7 +94,7 @@ public class TestMaxRunningAppsEnforcer queueMaxApps.put("root", 2); queueMaxApps.put("root.queue1", 1); queueMaxApps.put("root.queue2", 1); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -110,7 +111,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); assertEquals(1, leaf1.getRunnableAppSchedulables().size()); @@ -128,7 +129,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true); queueMaxApps.put("root.queue1.leaf1", 2); userMaxApps.put("user1", 1); - FSSchedulerApp app1 = addApp(leaf1, "user1"); + FSAppAttempt app1 = addApp(leaf1, "user1"); addApp(leaf1, "user2"); addApp(leaf1, "user3"); addApp(leaf2, "user1"); @@ -147,7 +148,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); clock.tick(20); @@ -167,7 +168,7 @@ public class TestMaxRunningAppsEnforcer FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true); FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true); queueMaxApps.put("root.queue1", 2); - FSSchedulerApp app1 = addApp(leaf1, "user"); + FSAppAttempt app1 = addApp(leaf1, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); addApp(leaf2, "user"); @@ -182,21 +183,18 @@ public class TestMaxRunningAppsEnforcer @Test public void testMultiListStartTimeIteratorEmptyAppLists() { - List<List<AppSchedulable>> lists = new ArrayList<List<AppSchedulable>>(); - lists.add(Arrays.asList(mockAppSched(1))); - lists.add(Arrays.asList(mockAppSched(2))); - Iterator<FSSchedulerApp> iter = + List<List<FSAppAttempt>> lists = new ArrayList<List<FSAppAttempt>>(); + lists.add(Arrays.asList(mockAppAttempt(1))); + lists.add(Arrays.asList(mockAppAttempt(2))); + Iterator<FSAppAttempt> iter = new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists); - assertEquals(1, iter.next().getAppSchedulable().getStartTime()); - assertEquals(2, iter.next().getAppSchedulable().getStartTime()); + assertEquals(1, iter.next().getStartTime()); + assertEquals(2, iter.next().getStartTime()); } - private AppSchedulable mockAppSched(long startTime) { - AppSchedulable appSched = mock(AppSchedulable.class); - when(appSched.getStartTime()).thenReturn(startTime); - FSSchedulerApp schedApp = mock(FSSchedulerApp.class); - when(schedApp.getAppSchedulable()).thenReturn(appSched); - when(appSched.getApp()).thenReturn(schedApp); - return appSched; + private FSAppAttempt mockAppAttempt(long startTime) { + FSAppAttempt schedApp = mock(FSAppAttempt.class); + when(schedApp.getStartTime()).thenReturn(startTime); + return schedApp; } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java Wed Aug 20 01:34:29 2014 @@ -18,15 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; +import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -34,6 +35,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; @@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -184,8 +188,8 @@ public class TestAMRMTokens { // The exception will still have the earlier appAttemptId as it picks it // up from the token. Assert.assertTrue(t.getCause().getMessage().contains( - "Password not found for ApplicationAttempt " + - applicationAttemptId.toString())); + applicationAttemptId.toString() + + " not found in AMRMTokenSecretManager.")); } } finally { @@ -328,6 +332,51 @@ public class TestAMRMTokens { } } + @Test (timeout = 20000) + public void testAMRMMasterKeysUpdate() throws Exception { + MockRM rm = new MockRM(conf) { + @Override + protected void doSecureLogin() throws IOException { + // Skip the login. + } + }; + rm.start(); + MockNM nm = rm.registerNode("127.0.0.1:1234", 8000); + RMApp app = rm.submitApp(200); + MockAM am = MockRM.launchAndRegisterAM(app, rm, nm); + + // Do allocate. Should not update AMRMToken + AllocateResponse response = + am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + + // roll over the master key + // Do allocate again. the AM should get the latest AMRMToken + rm.getRMContext().getAMRMTokenSecretManager().rollMasterKey(); + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNotNull(response.getAMRMToken()); + + Token<AMRMTokenIdentifier> amrmToken = + ConverterUtils.convertFromYarn(response.getAMRMToken(), new Text( + response.getAMRMToken().getService())); + + Assert.assertEquals(amrmToken.decodeIdentifier().getKeyId(), rm + .getRMContext().getAMRMTokenSecretManager().getMasterKey().getMasterKey() + .getKeyId()); + + // Do allocate again. The master key does not update. + // AM should not update its AMRMToken either + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + + // Activate the next master key. Since there is new master key generated + // in AMRMTokenSecretManager. The AMRMToken will not get updated for AM + rm.getRMContext().getAMRMTokenSecretManager().activateNextMasterKey(); + response = am.allocate(Records.newRecord(AllocateRequest.class)); + Assert.assertNull(response.getAMRMToken()); + rm.stop(); + } + private ApplicationMasterProtocol createRMClient(final MockRM rm, final Configuration conf, final YarnRPC rpc, UserGroupInformation currentUser) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ResourceManagerRest.apt.vm Wed Aug 20 01:34:29 2014 @@ -2912,3 +2912,24 @@ Accept: application/xml +---+ No response body. + +** Authentication using delegation tokens + + This feature is in the alpha mode and may change in the future. + + You can use delegation tokens to authenticate yourself when using YARN RM webservices. However, this requires setting the right configurations. The conditions for this are: + + * Hadoop is setup in secure mode with the authentication type set to kerberos. + + * Hadoop HTTP authentication is setup with the authentication type set to kerberos + + Once setup, delegation tokens can be fetched using the web services listed above and used as shown in an example below: + ++---+ + PUT http://<rm http address:port>/ws/v1/cluster/apps/application_1399397633663_0003/state + Hadoop-YARN-Auth-Delegation-Token: MgASY2xpZW50QEVYQU1QTEUuQ09NDHRlc3QtcmVuZXdlcgCKAUbjqcHHigFHB7ZFxwQCFKWD3znCkDSy6SQIjRCLDydxbxvgE1JNX0RFTEVHQVRJT05fVE9LRU4A + Content-Type: application/json; charset=UTF8 + { + "state":"KILLED" + } ++---+
