Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Tue Aug 19 23:49:39 2014 @@ -18,13 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; import java.io.FileWriter; @@ -34,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -42,8 +47,6 @@ import java.util.Set; import javax.xml.parsers.ParserConfigurationException; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -52,6 +55,7 @@ import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -64,23 +68,24 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -88,58 +93,28 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.xml.sax.SAXException; import com.google.common.collect.Sets; -public class TestFairScheduler { - - static class MockClock implements Clock { - private long time = 0; - @Override - public long getTime() { - return time; - } - - public void tick(int seconds) { - time = time + seconds * 1000; - } - - } - - final static String TEST_DIR = new File(System.getProperty("test.build.data", - "/tmp")).getAbsolutePath(); - - final static String ALLOC_FILE = new File(TEST_DIR, - "test-queues").getAbsolutePath(); +@SuppressWarnings("unchecked") +public class TestFairScheduler extends FairSchedulerTestBase { + private final static String ALLOC_FILE = + new File(TEST_DIR, "test-queues").getAbsolutePath(); - private FairScheduler scheduler; - private ResourceManager resourceManager; - private Configuration conf; - private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - - private int APP_ID = 1; // Incrementing counter for schedling apps - private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts - - // HELPER METHODS @Before public void setUp() throws IOException { scheduler = new FairScheduler(); conf = createConfiguration(); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); - conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, - 1024); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); - // All tests assume only one assignment per node update - conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); resourceManager = new ResourceManager(); resourceManager.init(conf); @@ -150,6 +125,8 @@ public class TestFairScheduler { // to initialize the master key resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); } @After @@ -163,12 +140,12 @@ public class TestFairScheduler { @Test (timeout = 30000) public void testConfValidation() throws Exception { - ResourceScheduler scheduler = new FairScheduler(); + FairScheduler scheduler = new FairScheduler(); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -182,7 +159,7 @@ public class TestFairScheduler { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -193,107 +170,6 @@ public class TestFairScheduler { } } - private Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - return conf; - } - - private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { - ApplicationId appIdImpl = ApplicationId.newInstance(0, appId); - ApplicationAttemptId attId = - ApplicationAttemptId.newInstance(appIdImpl, attemptId); - return attId; - } - - private ResourceRequest createResourceRequest(int memory, String host, - int priority, int numContainers, boolean relaxLocality) { - return createResourceRequest(memory, 1, host, priority, numContainers, - relaxLocality); - } - - private ResourceRequest createResourceRequest(int memory, int vcores, String host, - int priority, int numContainers, boolean relaxLocality) { - ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); - request.setCapability(BuilderUtils.newResource(memory, vcores)); - request.setResourceName(host); - request.setNumContainers(numContainers); - Priority prio = recordFactory.newRecordInstance(Priority.class); - prio.setPriority(priority); - request.setPriority(prio); - request.setRelaxLocality(relaxLocality); - return request; - } - - /** - * Creates a single container priority-1 request and submits to - * scheduler. - */ - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId) { - return createSchedulingRequest(memory, queueId, userId, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId) { - return createSchedulingRequest(memory, vcores, queueId, userId, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId, int numContainers) { - return createSchedulingRequest(memory, queueId, userId, numContainers, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId, int numContainers) { - return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, - String userId, int numContainers, int priority) { - return createSchedulingRequest(memory, 1, queueId, userId, numContainers, - priority); - } - - private ApplicationAttemptId createSchedulingRequest(int memory, int vcores, - String queueId, String userId, int numContainers, int priority) { - ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(id.getApplicationId(), queueId, userId); - // This conditional is for testAclSubmitApplication where app is rejected - // and no app is added. - if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) { - scheduler.addApplicationAttempt(id, false); - } - List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, numContainers, true); - ask.add(request); - scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null); - return id; - } - - private void createSchedulingRequestExistingApplication(int memory, int priority, - ApplicationAttemptId attId) { - ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY, - priority, 1, true); - createSchedulingRequestExistingApplication(request, attId); - } - - private void createSchedulingRequestExistingApplication(int memory, int vcores, - int priority, ApplicationAttemptId attId) { - ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, 1, true); - createSchedulingRequestExistingApplication(request, attId); - } - - private void createSchedulingRequestExistingApplication(ResourceRequest request, - ApplicationAttemptId attId) { - List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ask.add(request); - scheduler.allocate(attId, ask, new ArrayList<ContainerId>(), null, null); - } - // TESTS @Test(timeout=2000) @@ -315,6 +191,8 @@ public class TestFairScheduler { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); @@ -342,6 +220,7 @@ public class TestFairScheduler { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.init(conf); fs.reinitialize(conf, null); Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); @@ -359,8 +238,9 @@ public class TestFairScheduler { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); - Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); + fs.init(conf); + fs.reinitialize(conf, null); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); @@ -368,6 +248,8 @@ public class TestFairScheduler { @Test public void testAggregateCapacityTracking() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -376,23 +258,25 @@ public class TestFairScheduler { .newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - assertEquals(1024, scheduler.getClusterCapacity().getMemory()); + assertEquals(1024, scheduler.getClusterResource().getMemory()); // Add another node RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - assertEquals(1536, scheduler.getClusterCapacity().getMemory()); + assertEquals(1536, scheduler.getClusterResource().getMemory()); // Remove the first node NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); scheduler.handle(nodeEvent3); - assertEquals(512, scheduler.getClusterCapacity().getMemory()); + assertEquals(512, scheduler.getClusterResource().getMemory()); } @Test public void testSimpleFairShareCalculation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -405,6 +289,7 @@ public class TestFairScheduler { // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "queue1", "user1"); createSchedulingRequest(10 * 1024, "queue2", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -420,6 +305,8 @@ public class TestFairScheduler { @Test public void testSimpleHierarchicalFairShareCalculation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -433,6 +320,7 @@ public class TestFairScheduler { // Have two queues which want entire cluster capacity createSchedulingRequest(10 * 1024, "parent.queue2", "user1"); createSchedulingRequest(10 * 1024, "parent.queue3", "user1"); + createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); @@ -453,6 +341,8 @@ public class TestFairScheduler { @Test public void testHierarchicalQueuesSimilarParents() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -477,6 +367,8 @@ public class TestFairScheduler { @Test public void testSchedulerRootQueueMetrics() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -516,6 +408,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testSimpleContainerAllocation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -564,6 +458,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testSimpleContainerReservation() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -618,48 +514,27 @@ public class TestFairScheduler { @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user1"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) .getRunnableAppSchedulables().size()); - assertEquals("root.user1", rmApp.getQueue()); + assertEquals("root.user1", resourceManager.getRMContext().getRMApps() + .get(appAttemptId.getApplicationId()).getQueue()); } @Test public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMContext rmContext = resourceManager.getRMContext(); - Map<ApplicationId, RMApp> appsMap = rmContext.getRMApps(); ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); - RMApp rmApp = new RMAppImpl(appAttemptId.getApplicationId(), rmContext, conf, - null, null, null, ApplicationSubmissionContext.newInstance(null, null, - null, null, null, false, false, 0, null, null), null, null, 0, null, null); - appsMap.put(appAttemptId.getApplicationId(), rmApp); - - AppAddedSchedulerEvent appAddedEvent = - new AppAddedSchedulerEvent(appAttemptId.getApplicationId(), "default", - "user2"); - scheduler.handle(appAddedEvent); - AppAttemptAddedSchedulerEvent attempAddedEvent = - new AppAttemptAddedSchedulerEvent(appAttemptId, false); - scheduler.handle(attempAddedEvent); + createApplicationWithAMResource(appAttemptId, "default", "user2", null); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) .getRunnableAppSchedulables().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) @@ -670,6 +545,8 @@ public class TestFairScheduler { @Test public void testEmptyQueueName() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // only default queue @@ -690,8 +567,10 @@ public class TestFairScheduler { @Test public void testAssignToQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); @@ -708,6 +587,8 @@ public class TestFairScheduler { @Test public void testAssignToNonLeafQueueReturnsNull() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true); @@ -725,6 +606,8 @@ public class TestFairScheduler { public void testQueuePlacementWithPolicy() throws Exception { conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appId; @@ -737,8 +620,11 @@ public class TestFairScheduler { rules.add(new QueuePlacementRule.Default().initialize(true, null)); Set<String> queues = Sets.newHashSet("root.user1", "root.user3group", "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2"); + Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>(); + configuredQueues.put(FSQueueType.LEAF, queues); + configuredQueues.put(FSQueueType.PARENT, new HashSet<String>()); scheduler.getAllocationConfiguration().placementPolicy = - new QueuePlacementPolicy(rules, queues, conf); + new QueuePlacementPolicy(rules, configuredQueues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user1"); @@ -758,7 +644,7 @@ public class TestFairScheduler { rules.add(new QueuePlacementRule.Specified().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null)); scheduler.getAllocationConfiguration().placementPolicy = - new QueuePlacementPolicy(rules, queues, conf); + new QueuePlacementPolicy(rules, configuredQueues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "somequeue", "otheruser"); @@ -782,7 +668,9 @@ public class TestFairScheduler { out.println("</queue>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -809,23 +697,113 @@ public class TestFairScheduler { } } } + + @Test + public void testNestedUserQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"user1group\" type=\"parent\">"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queuePlacementPolicy>"); + out.println("<rule name=\"specified\" create=\"false\" />"); + out.println("<rule name=\"nestedUserQueue\">"); + out.println(" <rule name=\"primaryGroup\" create=\"false\" />"); + out.println("</rule>"); + out.println("<rule name=\"default\" />"); + out.println("</queuePlacementPolicy>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + + FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default", + "user1"); + + assertEquals("root.user1group.user1", user1Leaf.getName()); + } + + @Test + public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"parentq\" type=\"parent\">"); + out.println("<minResources>1024mb,0vcores</minResources>"); + out.println("</queue>"); + out.println("<queuePlacementPolicy>"); + out.println("<rule name=\"nestedUserQueue\">"); + out.println(" <rule name=\"specified\" create=\"false\" />"); + out.println("</rule>"); + out.println("<rule name=\"default\" />"); + out.println("</queuePlacementPolicy>"); + out.println("</allocations>"); + out.close(); + + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + int capacity = 16 * 1024; + // create node with 16 G + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + // user1,user2 submit their apps to parentq and create user queues + createSchedulingRequest(10 * 1024, "root.parentq", "user1"); + createSchedulingRequest(10 * 1024, "root.parentq", "user2"); + // user3 submits app in default queue + createSchedulingRequest(10 * 1024, "root.default", "user3"); + + scheduler.update(); + + Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().equals("root.parentq.user1") + || leaf.getName().equals("root.parentq.user2")) { + // assert that the fair share is 1/4th node1's capacity + assertEquals(capacity / 4, leaf.getFairShare().getMemory()); + // assert weights are equal for both the user queues + assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); + } + } + } + /** * Make allocation requests and ensure they are reflected in queue demand. */ @Test public void testQueueDemandCalculation() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1"); - scheduler.addApplicationAttempt(id11, false); + scheduler.addApplication(id11.getApplicationId(), "root.queue1", "user1", false); + scheduler.addApplicationAttempt(id11, false, false); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id21, false); + scheduler.addApplication(id21.getApplicationId(), "root.queue2", "user1", false); + scheduler.addApplicationAttempt(id21, false, false); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1"); - scheduler.addApplicationAttempt(id22, false); + scheduler.addApplication(id22.getApplicationId(), "root.queue2", "user1", false); + scheduler.addApplicationAttempt(id22, false, false); int minReqSize = FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB; @@ -865,6 +843,8 @@ public class TestFairScheduler { @Test public void testAppAdditionAndRemoval() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId attemptId =createAppAttemptId(1, 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", @@ -915,6 +895,8 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -947,7 +929,9 @@ public class TestFairScheduler { out.println("</queue>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -974,6 +958,8 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -1031,8 +1017,10 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, @@ -1075,13 +1063,13 @@ public class TestFairScheduler { @Test (timeout = 5000) /** - * Make sure containers are chosen to be preempted in the correct order. Right - * now this means decreasing order of priority. + * Make sure containers are chosen to be preempted in the correct order. */ public void testChoiceOfPreemptedContainers() throws Exception { conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -1098,141 +1086,215 @@ public class TestFairScheduler { out.println("<queue name=\"queueC\">"); out.println("<weight>.25</weight>"); out.println("</queue>"); - out.println("<queue name=\"queueD\">"); + out.println("<queue name=\"default\">"); out.println("<weight>.25</weight>"); out.println("</queue>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - // Create four nodes + // Create two nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - - // Queue A and B each request three containers + // Queue A and B each request two applications ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3); ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4); scheduler.update(); + scheduler.getQueueManager().getLeafQueue("queueA", true) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.getQueueManager().getLeafQueue("queueB", true) + .setPolicy(SchedulingPolicy.parse("fair")); + // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + for (int i = 0; i < 4; i++) { scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - - // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); - - scheduler.update(); - - // We should be able to claw back one container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - - // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), - scheduler.getSchedulerApp(app6).getPreemptionContainers())); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + + // Now new requests arrive from queueC and default + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + scheduler.update(); + + // We should be able to claw back one container from queueA and queueB each. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the app. + // For queueA (fifo), app2 is selected. + // For queueB (fair), app4 is selected. + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App4 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Inside each app, containers are sorted according to their priorities. + // Containers with priority 4 are preempted for app2 and app4. + Set<RMContainer> set = new HashSet<RMContainer>(); + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + for (RMContainer container : + scheduler.getSchedulerApp(app4).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + assertTrue("Containers with priority=4 in app2 and app4 should be " + + "preempted.", set.isEmpty()); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024)); // Pretend 15 seconds have passed clock.tick(15); // We should be able to claw back another container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + // For queueA (fifo), continue preempting from app2. + // For queueB (fair), even app4 has a lowest priority container with p=4, it + // still preempts from app3 as app3 is most over fair share. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertTrue("App1 should have no container to be preempted", + scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); + assertTrue("App2 should have no container to be preempted", + scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); + assertTrue("App3 should have no container to be preempted", + scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); + assertTrue("App4 should have no container to be preempted", + scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); + } + + @Test + public void testPreemptionIsNotDelayedToNextRound() throws Exception { + conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>8</weight>"); + out.println("<queue name=\"queueA1\" />"); + out.println("<queue name=\"queueA2\" />"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>2</weight>"); + out.println("</queue>"); + out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node of 8G + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(8 * 1024, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Run apps in queueA.A1 and queueB + ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA1", "user1", 7, 1); + // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); + ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB", + "user2", 1, 1); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 8; i++) { + scheduler.handle(nodeUpdate1); + } + + // verify if the apps got the containers they requested + assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + + // Now submit an app in queueA.queueA2 + ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1, + "queueA.queueA2", "user3", 7, 1); + scheduler.update(); + + // Let 11 sec pass + clock.tick(11); + + scheduler.update(); + Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); + assertEquals(3277, toPreempt.getMemory()); + + // verify if the 3 containers required by queueA2 are preempted in the same + // round + scheduler.preemptResources(toPreempt); + assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers() + .size()); } @Test (timeout = 5000) @@ -1271,6 +1333,8 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Create four nodes @@ -1365,9 +1429,11 @@ public class TestFairScheduler { assertEquals( 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); } - + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -1410,8 +1476,10 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // Add a node RMNode node1 = MockNodes @@ -1451,6 +1519,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -1466,7 +1536,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); ContainerId containerId = scheduler.getSchedulerApp(attId) @@ -1531,21 +1601,25 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname", 1); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname2", 1); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); assertNotNull("The application was not allowed", app1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); assertNull("The application was allowed", app2); } @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1563,8 +1637,8 @@ public class TestFairScheduler { scheduler.handle(nodeEvent2); ApplicationAttemptId appId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - scheduler.addApplication(appId.getApplicationId(), "queue1", "user1"); - scheduler.addApplicationAttempt(appId, false); + scheduler.addApplication(appId.getApplicationId(), "queue1", "user1", false); + scheduler.addApplicationAttempt(appId, false, false); // 1 request with 2 nodes on the same rack. another request with 1 node on // a different rack @@ -1595,6 +1669,8 @@ public class TestFairScheduler { @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1609,8 +1685,8 @@ public class TestFairScheduler { "user1", 2); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); queue1.setPolicy(new FifoPolicy()); @@ -1639,6 +1715,8 @@ public class TestFairScheduler { @Test(timeout = 3000) public void testMaxAssign() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = @@ -1650,7 +1728,42 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "root.default", "user", 8); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); + + // set maxAssign to 2: only 2 containers should be allocated + scheduler.maxAssign = 2; + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Incorrect number of containers allocated", 2, app + .getLiveContainers().size()); + + // set maxAssign to -1: all remaining containers should be allocated + scheduler.maxAssign = -1; + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Incorrect number of containers allocated", 8, app + .getLiveContainers().size()); + } + + @Test(timeout = 3000) + public void testMaxAssignWithZeroMemoryContainers() throws Exception { + conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + + ApplicationAttemptId attId = + createSchedulingRequest(0, 1, "root.default", "user", 8); + FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated scheduler.maxAssign = 2; @@ -1682,6 +1795,8 @@ public class TestFairScheduler { */ @Test(timeout = 5000) public void testAssignContainer() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); final String user = "user1"; @@ -1712,10 +1827,10 @@ public class TestFairScheduler { ApplicationAttemptId attId4 = createSchedulingRequest(1024, fifoQueue, user, 4); - FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1); - FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3); - FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); scheduler.getQueueManager().getLeafQueue(fifoQueue, true) .setPolicy(SchedulingPolicy.parse("fifo")); @@ -1765,9 +1880,11 @@ public class TestFairScheduler { out.println("</queue>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + int appId = this.APP_ID++; String user = "usernotallow"; String queue = "queue1"; @@ -1802,7 +1919,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = ApplicationAttemptId.newInstance(applicationId, this.ATTEMPT_ID++); - scheduler.addApplication(attId.getApplicationId(), queue, user); + scheduler.addApplication(attId.getApplicationId(), queue, user, false); numTries = 0; while (application.getFinishTime() == 0 && numTries < MAX_TRIES) { @@ -1816,6 +1933,8 @@ public class TestFairScheduler { @Test public void testReservationThatDoesntFit() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1830,7 +1949,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(0, app.getLiveContainers().size()); assertEquals(0, app.getReservedContainers().size()); @@ -1844,6 +1963,8 @@ public class TestFairScheduler { @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); @@ -1872,6 +1993,8 @@ public class TestFairScheduler { @Test public void testStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1899,7 +2022,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1912,6 +2035,8 @@ public class TestFairScheduler { @Test public void testCancelStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1938,7 +2063,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -1962,6 +2087,8 @@ public class TestFairScheduler { */ @Test public void testReservationsStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1971,7 +2098,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1", "user1", 0); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true); ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true); @@ -2002,6 +2129,8 @@ public class TestFairScheduler { @Test public void testNoMoreCpuOnNode() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), @@ -2011,7 +2140,7 @@ public class TestFairScheduler { ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", "user1", 2); - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); @@ -2023,6 +2152,8 @@ public class TestFairScheduler { @Test public void testBasicDRFAssignment() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); @@ -2031,13 +2162,13 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2063,6 +2194,8 @@ public class TestFairScheduler { */ @Test public void testBasicDRFWithQueues() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), @@ -2072,16 +2205,16 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", "user1", 2); - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", "user1", 2); - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); @@ -2099,6 +2232,8 @@ public class TestFairScheduler { @Test public void testDRFHierarchicalQueues() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), @@ -2109,22 +2244,22 @@ public class TestFairScheduler { ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); Thread.sleep(3); // so that start times will be different - FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); - drfPolicy.initialize(scheduler.getClusterCapacity()); + drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1.subqueue1").setPolicy(drfPolicy); @@ -2167,7 +2302,9 @@ public class TestFairScheduler { public void testHostPortNodeName() throws Exception { conf.setBoolean(YarnConfiguration .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - scheduler.reinitialize(conf, + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1", 1); @@ -2201,7 +2338,7 @@ public class TestFairScheduler { NodeUpdateSchedulerEvent(node2); // no matter how many heartbeats, node2 should never get a container - FSSchedulerApp app = scheduler.getSchedulerApp(attId1); + FSAppAttempt app = scheduler.getSchedulerApp(attId1); for (int i = 0; i < 10; i++) { scheduler.handle(node2UpdateEvent); assertEquals(0, app.getLiveContainers().size()); @@ -2213,14 +2350,14 @@ public class TestFairScheduler { } private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) { - FSSchedulerApp app = scheduler.getSchedulerApp(attId); + FSAppAttempt app = scheduler.getSchedulerApp(attId); FSLeafQueue queue = app.getQueue(); - Collection<AppSchedulable> runnableApps = + Collection<FSAppAttempt> runnableApps = queue.getRunnableAppSchedulables(); - Collection<AppSchedulable> nonRunnableApps = + Collection<FSAppAttempt> nonRunnableApps = queue.getNonRunnableAppSchedulables(); - assertEquals(runnable, runnableApps.contains(app.getAppSchedulable())); - assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable())); + assertEquals(runnable, runnableApps.contains(app)); + assertEquals(!runnable, nonRunnableApps.contains(app)); } private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue, @@ -2247,9 +2384,11 @@ public class TestFairScheduler { out.println("</user>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); verifyAppRunnable(attId1, true); @@ -2283,6 +2422,284 @@ public class TestFairScheduler { } @Test + public void testQueueMaxAMShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queue1\">"); + out.println("<maxAMShare>0.2</maxAMShare>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); + + createSchedulingRequest(1 * 1024, "root.default", "user1"); + scheduler.update(); + scheduler.handle(updateEvent); + + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(2048, 2); + Resource amResource3 = Resource.newInstance(1860, 2); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + // Exceeds no limits + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 1024 MB memory", + 1024, queue1.getAmResourceUsage().getMemory()); + + // Exceeds no limits + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Exceeds queue limit + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application3's AM requests 1024 MB memory", + 1024, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Still can run non-AM container + createSchedulingRequestExistingApplication(1024, 1, attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1 should have two running containers", + 2, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Remove app1, app3's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + scheduler.handle(updateEvent); + assertEquals("Application1's AM should be finished", + 0, app1.getLiveContainers().size()); + assertEquals("Application3's AM should be running", + 1, app3.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Exceeds queue limit + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application4's AM requests 2048 MB memory", + 2048, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should not be running", + 0, app4.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Exceeds queue limit + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application5's AM requests 2048 MB memory", + 2048, app5.getAMResource().getMemory()); + assertEquals("Application5's AM should not be running", + 0, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Remove un-running app doesn't affect others + AppAttemptRemovedSchedulerEvent appRemovedEvent4 = + new AppAttemptRemovedSchedulerEvent(attId4, RMAppAttemptState.KILLED, false); + scheduler.handle(appRemovedEvent4); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application5's AM should not be running", + 0, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Remove app2 and app3, app5's AM should become running + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = + new AppAttemptRemovedSchedulerEvent(attId2, RMAppAttemptState.FINISHED, false); + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = + new AppAttemptRemovedSchedulerEvent(attId3, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent2); + scheduler.handle(appRemovedEvent3); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM should be finished", + 0, app2.getLiveContainers().size()); + assertEquals("Application3's AM should be finished", + 0, app3.getLiveContainers().size()); + assertEquals("Application5's AM should be running", + 1, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Check amResource normalization + ApplicationAttemptId attId6 = createAppAttemptId(6, 1); + createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); + createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application6's AM should not be running", + 0, app6.getLiveContainers().size()); + assertEquals("Application6's AM requests 2048 MB memory", + 2048, app6.getAMResource().getMemory()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Remove all apps + AppAttemptRemovedSchedulerEvent appRemovedEvent5 = + new AppAttemptRemovedSchedulerEvent(attId5, RMAppAttemptState.FINISHED, false); + AppAttemptRemovedSchedulerEvent appRemovedEvent6 = + new AppAttemptRemovedSchedulerEvent(attId6, RMAppAttemptState.FINISHED, false); + scheduler.handle(appRemovedEvent5); + scheduler.handle(appRemovedEvent6); + scheduler.update(); + assertEquals("Queue1's AM resource usage should be 0", + 0, queue1.getAmResourceUsage().getMemory()); + } + + @Test + public void testQueueMaxAMShareDefault() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queue1\">"); + out.println("</queue>"); + out.println("<queue name=\"queue2\">"); + out.println("<maxAMShare>1.0</maxAMShare>"); + out.println("</queue>"); + out.println("<queue name=\"queue3\">"); + out.println("</queue>"); + out.println("<queue name=\"queue4\">"); + out.println("</queue>"); + out.println("<queue name=\"queue5\">"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(8192, 20), + 0, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeEvent); + scheduler.update(); + + FSLeafQueue queue1 = + scheduler.getQueueManager().getLeafQueue("queue1", true); + assertEquals("Queue queue1's fair share should be 0", 0, queue1 + .getFairShare().getMemory()); + FSLeafQueue queue2 = + scheduler.getQueueManager().getLeafQueue("queue2", true); + assertEquals("Queue queue2's fair share should be 0", 0, queue2 + .getFairShare().getMemory()); + FSLeafQueue queue3 = + scheduler.getQueueManager().getLeafQueue("queue3", true); + assertEquals("Queue queue3's fair share should be 0", 0, queue3 + .getFairShare().getMemory()); + FSLeafQueue queue4 = + scheduler.getQueueManager().getLeafQueue("queue4", true); + assertEquals("Queue queue4's fair share should be 0", 0, queue4 + .getFairShare().getMemory()); + FSLeafQueue queue5 = + scheduler.getQueueManager().getLeafQueue("queue5", true); + assertEquals("Queue queue5's fair share should be 0", 0, queue5 + .getFairShare().getMemory()); + + List<String> queues = Arrays.asList("root.default", "root.queue3", + "root.queue4", "root.queue5"); + for (String queue : queues) { + createSchedulingRequest(1 * 1024, queue, "user1"); + scheduler.update(); + scheduler.handle(updateEvent); + } + + Resource amResource1 = Resource.newInstance(2048, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + + // Exceeds queue limit, but default maxAMShare is -1.0 so it doesn't matter + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "test1", amResource1); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application1's AM requests 2048 MB memory", + 2048, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + // Exceeds queue limit, and maxAMShare is 1.0 + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue2", "test1", amResource1); + createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + scheduler.handle(updateEvent); + assertEquals("Application2's AM requests 2048 MB memory", + 2048, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should not be running", + 0, app2.getLiveContainers().size()); + assertEquals("Queue2's AM resource usage should be 0 MB memory", + 0, queue2.getAmResourceUsage().getMemory()); + } + + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); MockClock clock = new MockClock(); @@ -2301,9 +2718,11 @@ public class TestFairScheduler { out.println("</queue>"); out.println("</allocations>"); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); verifyAppRunnable(attId1, true); @@ -2363,6 +2782,9 @@ public class TestFairScheduler { Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); + fs.setRMContext(resourceManager.getRMContext()); + fs.init(conf); + fs.start(); fs.reinitialize(conf, resourceManager.getRMContext()); Assert.assertTrue("Continuous scheduling should be enabled.", fs.isContinuousSchedulingEnabled()); @@ -2380,14 +2802,14 @@ public class TestFairScheduler { fs.handle(nodeEvent2); // available resource - Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024); - Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16); + Assert.assertEquals(fs.getClusterResource().getMemory(), 16 * 1024); + Assert.assertEquals(fs.getClusterResource().getVirtualCores(), 16); // send application request ApplicationAttemptId appAttemptId = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); - fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11"); - fs.addApplicationAttempt(appAttemptId, false); + fs.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); + fs.addApplicationAttempt(appAttemptId, false, false); List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); @@ -2398,7 +2820,7 @@ public class TestFairScheduler { // at least one pass Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500); - FSSchedulerApp app = fs.getSchedulerApp(appAttemptId); + FSAppAttempt app = fs.getSchedulerApp(appAttemptId); // Wait until app gets resources. while (app.getCurrentConsumption().equals(Resources.none())) { } @@ -2429,7 +2851,43 @@ public class TestFairScheduler { Assert.assertEquals(2, nodes.size()); } - + @Test + public void testContinuousSchedulingWithNodeRemoved() throws Exception { + // Disable continuous scheduling, will invoke continuous scheduling once manually + scheduler.init(conf); + scheduler.start(); + Assert.assertTrue("Continuous scheduling should be disabled.", + !scheduler.isContinuousSchedulingEnabled()); + + // Add two nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + Assert.assertEquals("We should have two alive nodes.", + 2, scheduler.getNumClusterNodes()); + + // Remove one node + NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(removeNode1); + Assert.assertEquals("We should only have one alive node.", + 1, scheduler.getNumClusterNodes()); + + // Invoke the continuous scheduling once + try { + scheduler.continuousSchedulingAttempt(); + } catch (Exception e) { + fail("Exception happened when doing continuous scheduling. " + + e.toString()); + } + } + @Test public void testDontAllowUndeclaredPools() throws Exception{ conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); @@ -2443,6 +2901,8 @@ public class TestFairScheduler { out.println("</allocations>"); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -2470,9 +2930,123 @@ public class TestFairScheduler { assertEquals(2, defaultQueue.getRunnableAppSchedulables().size()); } + @Test + public void testDefaultRuleInitializesProperlyWhenPolicyNotConfigured() + throws IOException { + // This test verifies if default rule in queue placement policy + // initializes properly when policy is not configured and + // undeclared pools is not allowed. + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); + + // Create an alloc file with no queue placement policy + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("</allocations>"); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + List<QueuePlacementRule> rules = scheduler.allocConf.placementPolicy + .getRules(); + + for (QueuePlacementRule rule : rules) { + if (rule instanceof Default) { + Default defaultRule = (Default) rule; + assertNotNull(defaultRule.defaultQueueName); + } + } + } + + @Test(timeout=5000) + public void testRecoverRequestAfterPreemption() throws Exception { + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); + + MockClock clock = new MockClock(); + scheduler.setClock(clock); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + Priority priority = Priority.newInstance(20); + String host = "127.0.0.1"; + int GB = 1024; + + // Create Node and raised Node Added event + RMNode node = MockNodes.newNodeInfo(1, + Resources.createResource(16 * 1024, 4), 0, host); + NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); + scheduler.handle(nodeEvent); + + // Create 3 container requests and place it in ask + List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); + ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host, + priority.getPriority(), 1, true); + ResourceRequest rackLocalRequest = createResourceRequest(GB, 1, + node.getRackName(), priority.getPriority(), 1, true); + ResourceRequest offRackRequest = createResourceRequest(GB, 1, + ResourceRequest.ANY, priority.getPriority(), 1, true); + ask.add(nodeLocalRequest); + ask.add(rackLocalRequest); + ask.add(offRackRequest); + + // Create Request and update + ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA", + "user1", ask); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + scheduler.handle(nodeUpdate); + + assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() + .size()); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + + // ResourceRequest will be empty once NodeUpdate is completed + Assert.assertNull(app.getResourceRequest(priority, host)); + + ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1); + RMContainer rmContainer = app.getRMContainer(containerId1); + + // Create a preempt event and register for preemption + scheduler.warnOrKillContainer(rmContainer); + + // Wait for few clock ticks + clock.tick(5); + + // preempt now + scheduler.warnOrKillContainer(rmContainer); + + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + // Once recovered, resource request will be present again in app + Assert.assertEquals(3, requests.size()); + for (ResourceRequest request : requests) { + Assert.assertEquals(1, + app.getResourceRequest(priority, request.getResourceName()) + .getNumContainers()); + } + + // Send node heartbeat + scheduler.update(); + scheduler.handle(nodeUpdate); + + List<Container> containers = scheduler.allocate(appAttemptId, + Collections.<ResourceRequest> emptyList(), + Collections.<ContainerId> emptyList(), null, null).getContainers(); + + // Now with updated ResourceRequest, a container is allocated for AM. + Assert.assertTrue(containers.size() == 1); + } + @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); final int GB = 1024; @@ -2486,7 +3060,7 @@ public class TestFairScheduler { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, "root.default", "user", 1); - FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId); + FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(), @@ -2525,6 +3099,8 @@ public class TestFairScheduler { @Test public void testGetAppsInQueue() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appAttId1 = @@ -2561,16 +3137,19 @@ public class TestFairScheduler { @Test
[... 222 lines stripped ...]
