Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff ============================================================================== --- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Feb 7 20:33:01 2014 @@ -78,8 +78,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; @@ -256,12 +255,7 @@ public class TestFairScheduler { private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, String queueId, String userId, int numContainers, int priority) { ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); - // This conditional is for testAclSubmitApplication where app is rejected - // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); - } + scheduler.addApplicationAttempt(id, queueId, userId); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, priority, numContainers, true); @@ -589,7 +583,7 @@ public class TestFairScheduler { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). getResourceUsage().getMemory()); - assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity RMNode node2 = @@ -605,10 +599,10 @@ public class TestFairScheduler { getResourceUsage().getMemory()); // The old reservation should still be there... - assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); // ... but it should disappear when we update the first node. scheduler.handle(updateEvent); - assertEquals(0, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemory()); + assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory()); } @@ -624,13 +618,9 @@ public class TestFairScheduler { null, null, null, false, false, 0, null, null), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user1"); + AppAttemptAddedSchedulerEvent appAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user1"); scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) @@ -649,14 +639,10 @@ public class TestFairScheduler { null, null, null, ApplicationSubmissionContext.newInstance(null, null, null, null, null, false, false, 0, null, null), null, null, 0, null); appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user2"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + + AppAttemptAddedSchedulerEvent appAddedEvent2 = + new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user2"); + scheduler.handle(appAddedEvent2); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) @@ -674,8 +660,8 @@ public class TestFairScheduler { // submit app with empty queue ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "", "user1"); + AppAttemptAddedSchedulerEvent appAddedEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, "", "user1"); scheduler.handle(appAddedEvent); // submission rejected @@ -709,6 +695,7 @@ public class TestFairScheduler { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appId; + Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications; List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); rules.add(new QueuePlacementRule.Specified().initialize(true, null)); @@ -721,17 +708,17 @@ public class TestFairScheduler { scheduler.getAllocationConfiguration().placementPolicy = new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); - assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.somequeue", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user1"); - assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.user1", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user3"); - assertEquals("root.user3group", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.user3group", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user4"); - assertEquals("root.user4subgroup1", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.user4subgroup1", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user5"); - assertEquals("root.user5subgroup2", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.user5subgroup2", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "otheruser"); - assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.default", apps.get(appId).getQueueName()); // test without specified as first rule rules = new ArrayList<QueuePlacementRule>(); @@ -741,11 +728,11 @@ public class TestFairScheduler { scheduler.getAllocationConfiguration().placementPolicy = new QueuePlacementPolicy(rules, queues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); - assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.user1", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "somequeue", "otheruser"); - assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.somequeue", apps.get(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "otheruser"); - assertEquals("root.default", scheduler.getSchedulerApp(appId).getQueueName()); + assertEquals("root.default", apps.get(appId).getQueueName()); } @Test @@ -799,14 +786,11 @@ public class TestFairScheduler { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplicationAttempt(id11, "root.queue1", "user1"); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplicationAttempt(id21, "root.queue2", "user1"); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplicationAttempt(id22, "root.queue2", "user1"); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -847,13 +831,11 @@ public class TestFairScheduler { @Test public void testAppAdditionAndRemoval() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); - ApplicationAttemptId attemptId =createAppAttemptId(1, 1); - AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", - "user1"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attemptAddedEvent = - new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), false); - scheduler.handle(attemptAddedEvent); + + AppAttemptAddedSchedulerEvent appAddedEvent1 = + new AppAttemptAddedSchedulerEvent(createAppAttemptId(1, 1), "default", + "user1"); + scheduler.handle(appAddedEvent1); // Scheduler should have two queues (the default and the one created for user1) assertEquals(2, scheduler.getQueueManager().getLeafQueues().size()); @@ -863,7 +845,7 @@ public class TestFairScheduler { .getRunnableAppSchedulables().size()); AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( - createAppAttemptId(1, 1), RMAppAttemptState.FINISHED, false); + createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); // Now remove app scheduler.handle(appRemovedEvent1); @@ -1136,12 +1118,12 @@ public class TestFairScheduler { scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size()); // Now new requests arrive from queues C and D ApplicationAttemptId app7 = @@ -1164,16 +1146,16 @@ public class TestFairScheduler { // Make sure it is lowest priority container. scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), - scheduler.getSchedulerApp(app6).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(), + scheduler.applications.get(app3).getPreemptionContainers())); + assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(), + scheduler.applications.get(app6).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); @@ -1183,8 +1165,8 @@ public class TestFairScheduler { Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); // Trigger a kill by insisting we want containers back scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), @@ -1198,22 +1180,22 @@ public class TestFairScheduler { scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1372,9 +1354,9 @@ public class TestFairScheduler { // One container should get reservation and the other should get nothing assertEquals(1024, - scheduler.getSchedulerApp(attId1).getCurrentReservation().getMemory()); + scheduler.applications.get(attId1).getCurrentReservation().getMemory()); assertEquals(0, - scheduler.getSchedulerApp(attId2).getCurrentReservation().getMemory()); + scheduler.applications.get(attId2).getCurrentReservation().getMemory()); } @Test (timeout = 5000) @@ -1409,7 +1391,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // App 1 should be running - assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(attId1).getLiveContainers().size()); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 1); @@ -1418,7 +1400,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // App 2 should not be running - assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(attId2).getLiveContainers().size()); // Request another container for app 1 createSchedulingRequestExistingApplication(1024, 1, attId1); @@ -1427,7 +1409,7 @@ public class TestFairScheduler { scheduler.handle(updateEvent); // Request should be fulfilled - assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1447,10 +1429,10 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); assertEquals(1, app.getLiveContainers().size()); - ContainerId containerId = scheduler.getSchedulerApp(attId) + ContainerId containerId = scheduler.applications.get(attId) .getLiveContainers().iterator().next().getContainerId(); // Cause reservation to be created @@ -1519,9 +1501,9 @@ public class TestFairScheduler { ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app1 = scheduler.applications.get(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerApp app2 = scheduler.applications.get(attId2); assertNull("The application was allowed", app2); } @@ -1544,8 +1526,7 @@ public class TestFairScheduler { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplicationAttempt(appId, "queue1", "user1"); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -1564,14 +1545,14 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent1); // should assign node local - assertEquals(1, scheduler.getSchedulerApp(appId).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(appId).getLiveContainers().size()); // node 2 checks in scheduler.update(); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); // should assign rack local - assertEquals(2, scheduler.getSchedulerApp(appId).getLiveContainers().size()); + assertEquals(2, scheduler.applications.get(appId).getLiveContainers().size()); } @Test (timeout = 5000) @@ -1590,8 +1571,8 @@ public class TestFairScheduler { "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSSchedulerApp app1 = scheduler.applications.get(attId1); + FSSchedulerApp app2 = scheduler.applications.get(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1631,7 +1612,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1693,10 +1674,10 @@ public class TestFairScheduler { ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSSchedulerApp app1 = scheduler.applications.get(attId1); + FSSchedulerApp app2 = scheduler.applications.get(attId2); + FSSchedulerApp app3 = scheduler.applications.get(attId3); + FSSchedulerApp app4 = scheduler.applications.get(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1783,7 +1764,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplicationAttempt(attId, queue, user); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -1811,7 +1792,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -1880,7 +1861,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app = scheduler.applications.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1919,7 +1900,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app = scheduler.applications.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1952,7 +1933,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -1992,7 +1973,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2012,10 +1993,10 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2053,13 +2034,13 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSSchedulerApp app3 = scheduler.applications.get(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2090,19 +2071,19 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSSchedulerApp app1 = scheduler.applications.get(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSSchedulerApp app2 = scheduler.applications.get(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSSchedulerApp app3 = scheduler.applications.get(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSSchedulerApp app4 = scheduler.applications.get(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); drfPolicy.initialize(scheduler.getClusterCapacity()); @@ -2182,7 +2163,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSSchedulerApp app = scheduler.applications.get(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2193,8 +2174,16 @@ public class TestFairScheduler { assertEquals(1, app.getLiveContainers().size()); } + @Test + public void testConcurrentAccessOnApplications() throws Exception { + FairScheduler fs = new FairScheduler(); + TestCapacityScheduler.verifyConcurrentAccessOnApplications( + fs.applications, FSSchedulerApp.class, FSLeafQueue.class); + } + + private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSSchedulerApp app = scheduler.applications.get(attId); FSLeafQueue queue = app.getQueue(); Collection<AppSchedulable> runnableApps = queue.getRunnableAppSchedulables(); @@ -2250,7 +2239,7 @@ public class TestFairScheduler { // Remove app 1 and both app 2 and app 4 should becomes runnable in its place AppAttemptRemovedSchedulerEvent appRemovedEvent1 = - new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED); scheduler.handle(appRemovedEvent1); verifyAppRunnable(attId2, true); verifyQueueNumRunnable("queue2", 1, 0); @@ -2314,7 +2303,7 @@ public class TestFairScheduler { // Even though the app was removed from sub3, the app from sub2 gets to go // because it came in first AppAttemptRemovedSchedulerEvent appRemovedEvent1 = - new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false); + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED); scheduler.handle(appRemovedEvent1); verifyAppRunnable(attId4, true); verifyQueueNumRunnable("queue1.sub2", 2, 0); @@ -2323,7 +2312,7 @@ public class TestFairScheduler { // Now test removal of a non-runnable app AppAttemptRemovedSchedulerEvent appRemovedEvent2 = - new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED, true); + new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.KILLED); scheduler.handle(appRemovedEvent2); assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps .get("user1").size()); @@ -2331,7 +2320,7 @@ public class TestFairScheduler { verifyQueueNumRunnable("queue1.sub3", 0, 0); // verify it doesn't become runnable when there would be space for it AppAttemptRemovedSchedulerEvent appRemovedEvent3 = - new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED, true); + new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.FINISHED); scheduler.handle(appRemovedEvent3); verifyQueueNumRunnable("queue1.sub2", 1, 0); verifyQueueNumRunnable("queue1.sub3", 0, 0); @@ -2367,8 +2356,7 @@ public class TestFairScheduler { // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplicationAttempt(appAttemptId, "queue11", "user11"); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); @@ -2379,7 +2367,7 @@ public class TestFairScheduler { // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSSchedulerApp app = fs.applications.get(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2467,7 +2455,7 @@ public class TestFairScheduler { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSSchedulerApp app = scheduler.applications.get(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -2477,7 +2465,7 @@ public class TestFairScheduler { scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, Collections.singletonList(host)); - assertFalse(scheduler.getSchedulerApp(appAttemptId).isBlacklisted(host)); + assertFalse(scheduler.applications.get(appAttemptId).isBlacklisted(host)); List<ResourceRequest> update = Arrays.asList( createResourceRequest(GB, node.getHostName(), 1, 0, true)); @@ -2539,12 +2527,4 @@ public class TestFairScheduler { assertTrue(appAttIds.contains(appAttId1)); assertTrue(appAttIds.contains(appAttId2)); } - - @Test - public void testAddAndRemoveAppFromFairScheduler() throws Exception { - FairScheduler scheduler = - (FairScheduler) resourceManager.getResourceScheduler(); - TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - scheduler.getSchedulerApplications(), scheduler, "default"); - } }
Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1565792&r1=1565791&r2=1565792&view=diff ============================================================================== --- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original) +++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Fri Feb 7 20:33:01 2014 @@ -61,11 +61,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -152,16 +150,14 @@ public class TestFifoScheduler { ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); - SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user"); - schedular.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent); + SchedulerEvent event = + new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user"); + schedular.handle(event); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); - SchedulerEvent attemptEvent2 = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent2); + + event = new AppAttemptAddedSchedulerEvent(appAttemptId, "queue", "user"); + schedular.handle(event); int afterAppsSubmitted = metrics.getAppsSubmitted(); Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted); @@ -192,13 +188,9 @@ public class TestFifoScheduler { int _appAttemptId = 1; ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, _appAttemptId); - AppAddedSchedulerEvent appEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", - "user1"); - scheduler.handle(appEvent); - AppAttemptAddedSchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attemptEvent); + AppAttemptAddedSchedulerEvent appEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1"); + scheduler.handle(appEvent1); int memory = 64; int nConts = 3; @@ -282,13 +274,9 @@ public class TestFifoScheduler { int _appAttemptId = 1; ApplicationAttemptId appAttemptId = createAppAttemptId(_appId, _appAttemptId); - AppAddedSchedulerEvent appEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "queue1", - "user1"); - scheduler.handle(appEvent); - AppAttemptAddedSchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attemptEvent); + AppAttemptAddedSchedulerEvent appEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId, "queue1", "user1"); + scheduler.handle(appEvent1); int memory = 1024; int priority = 1; @@ -528,6 +516,13 @@ public class TestFifoScheduler { LOG.info("--- END: testFifoScheduler ---"); } + @Test + public void testConcurrentAccessOnApplications() throws Exception { + FifoScheduler fs = new FifoScheduler(); + TestCapacityScheduler.verifyConcurrentAccessOnApplications( + fs.applications, FiCaSchedulerApp.class, Queue.class); + } + @SuppressWarnings("resource") @Test public void testBlackListNodes() throws Exception { @@ -546,23 +541,19 @@ public class TestFifoScheduler { ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); - SchedulerEvent appEvent = - new AppAddedSchedulerEvent(appId, "default", - "user"); - fs.handle(appEvent); - SchedulerEvent attemptEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - fs.handle(attemptEvent); + SchedulerEvent event = + new AppAttemptAddedSchedulerEvent(appAttemptId, "default", "user"); + fs.handle(event); // Verify the blacklist can be updated independent of requesting containers fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), Collections.singletonList(host), null); - Assert.assertTrue(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); + Assert.assertTrue(fs.getApplication(appAttemptId).isBlacklisted(host)); fs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), Collections.<ContainerId>emptyList(), null, Collections.singletonList(host)); - Assert.assertFalse(fs.getApplicationAttempt(appAttemptId).isBlacklisted(host)); + Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host)); rm.stop(); } @@ -584,17 +575,6 @@ public class TestFifoScheduler { Assert.assertNull(scheduler.getAppsInQueue("someotherqueue")); } - @Test - public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { - Configuration conf = new Configuration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, - ResourceScheduler.class); - MockRM rm = new MockRM(conf); - FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); - TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - fs.getSchedulerApplications(), fs, "queue"); - } - private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory()); Modified: hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java?rev=1565792&r1=1565791&r2=1565792&view=diff ============================================================================== --- hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java (original) +++ hadoop/common/branches/branch-2.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java Fri Feb 7 20:33:01 2014 @@ -41,9 +41,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; @@ -1384,29 +1386,24 @@ public class TestRMWebServicesApps exten rm.stop(); } - @Test (timeout = 20000) + @Test public void testMultipleAppAttempts() throws JSONException, Exception { rm.start(); - MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192); + MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048); RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1"); - MockAM am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager); + amNodeManager.nodeHeartbeat(true); int maxAppAttempts = rm.getConfig().getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); assertTrue(maxAppAttempts > 1); - int numAttempt = 1; - while (true) { - // fail the AM by sending CONTAINER_FINISHED event without registering. - amNodeManager.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); - am.waitForState(RMAppAttemptState.FAILED); - if (numAttempt == maxAppAttempts) { - rm.waitForState(app1.getApplicationId(), RMAppState.FAILED); - break; - } - // wait for app to start a new attempt. + int retriesLeft = maxAppAttempts; + while (--retriesLeft > 0) { + RMAppEvent event = + new RMAppFailedAttemptEvent(app1.getApplicationId(), + RMAppEventType.ATTEMPT_FAILED, ""); + app1.handle(event); rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager); - numAttempt++; + amNodeManager.nodeHeartbeat(true); } assertEquals("incorrect number of attempts", maxAppAttempts, app1.getAppAttempts().values().size());
