Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1361020&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Jul 13 00:43:01 2012 @@ -0,0 +1,992 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFairScheduler { + + private class MockClock implements Clock { + private long time = 0; + @Override + public long getTime() { + return time; + } + + public void tick(int seconds) { + time = time + seconds * 1000; + } + + } + + final static String TEST_DIR = new File(System.getProperty("test.build.data", + "/tmp")).getAbsolutePath(); + + final static String ALLOC_FILE = new File(TEST_DIR, + "test-queues").getAbsolutePath(); + + private FairScheduler scheduler; + private ResourceManager resourceManager; + private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + + private int APP_ID = 1; // Incrementing counter for schedling apps + private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts + + // HELPER METHODS + @Before + public void setUp() throws IOException { + scheduler = new FairScheduler(); + Configuration conf = new Configuration(); + // All tests assume only one assignment per node update + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + Store store = StoreFactory.getStore(conf); + resourceManager = new ResourceManager(store); + resourceManager.init(conf); + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + } + + @After + public void tearDown() { + scheduler = null; + resourceManager = null; + } + + private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) { + ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class); + ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class); + appIdImpl.setId(appId); + attId.setAttemptId(attemptId); + attId.setApplicationId(appIdImpl); + return attId; + } + + + private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) { + ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); + request.setCapability(Resources.createResource(memory)); + request.setHostName(host); + request.setNumContainers(numContainers); + Priority prio = recordFactory.newRecordInstance(Priority.class); + prio.setPriority(priority); + request.setPriority(prio); + return request; + } + + /** + * Creates a single container priority-1 request and submits to + * scheduler. + */ + private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) { + return createSchedulingRequest(memory, queueId, userId, 1); + } + + private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) { + return createSchedulingRequest(memory, queueId, userId, numContainers, 1); + } + + private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) { + ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + scheduler.addApplication(id, queueId, userId); + List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); + ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers); + ask.add(request); + scheduler.allocate(id, ask, new ArrayList<ContainerId>()); + return id; + } + + // TESTS + + @Test + public void testAggregateCapacityTracking() throws Exception { + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + assertEquals(1024, scheduler.getClusterCapacity().getMemory()); + + // Add another node + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + assertEquals(1536, scheduler.getClusterCapacity().getMemory()); + + // Remove the first node + NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(nodeEvent3); + assertEquals(512, scheduler.getClusterCapacity().getMemory()); + } + + @Test + public void testSimpleFairShareCalculation() { + // Add one big node (only care about aggregate capacity) + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Have two queues which want entire cluster capacity + createSchedulingRequest(10 * 1024, "queue1", "user1"); + createSchedulingRequest(10 * 1024, "queue2", "user1"); + + scheduler.update(); + + Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); + assertEquals(3, queues.size()); + + for (FSQueue p : queues) { + if (p.getName() != "default") { + assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory()); + } + } + } + + @Test + public void testSimpleContainerAllocation() { + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Add another node + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + createSchedulingRequest(512, "queue1", "user1", 2); + + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, + new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); + scheduler.handle(updateEvent); + + assertEquals(512, scheduler.getQueueManager().getQueue("queue1"). + getQueueSchedulable().getResourceUsage().getMemory()); + + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, + new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); + scheduler.handle(updateEvent2); + + assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). + getQueueSchedulable().getResourceUsage().getMemory()); + } + + @Test + public void testSimpleContainerReservation() throws InterruptedException { + // Add a node + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of node + createSchedulingRequest(1024, "queue1", "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1, + new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). + getQueueSchedulable().getResourceUsage().getMemory()); + + // Now queue 2 requests likewise + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 is waiting with a reservation + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getQueueSchedulable().getResourceUsage().getMemory()); + assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + + // Now another node checks in with capacity + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, + new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); + scheduler.handle(nodeEvent2); + scheduler.handle(updateEvent2); + + // Make sure this goes to queue 2 + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getQueueSchedulable().getResourceUsage().getMemory()); + + // The old reservation should still be there... + assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + // ... but it should disappear when we update the first node. + scheduler.handle(updateEvent); + assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory()); + + } + + @Test + public void testUserAsDefaultQueue() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( + createAppAttemptId(1, 1), "default", "user1"); + scheduler.handle(appAddedEvent); + assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); + assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); + + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( + createAppAttemptId(2, 1), "default", "user2"); + scheduler.handle(appAddedEvent2); + assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); + assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); + assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size()); + } + + @Test + public void testFairShareWithMinAlloc() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<minResources>2048</minResources>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + createSchedulingRequest(2 * 1024, "queueA", "user1"); + createSchedulingRequest(2 * 1024, "queueB", "user1"); + + scheduler.update(); + + Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); + assertEquals(3, queues.size()); + + for (FSQueue p : queues) { + if (p.getName().equals("queueA")) { + assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory()); + } + else if (p.getName().equals("queueB")) { + assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory()); + } + } + + } + + /** + * Make allocation requests and ensure they are reflected in queue demand. + */ + @Test + public void testQueueDemandCalculation() throws Exception { + ApplicationAttemptId id11 = createAppAttemptId(1, 1); + scheduler.addApplication(id11, "queue1", "user1"); + ApplicationAttemptId id21 = createAppAttemptId(2, 1); + scheduler.addApplication(id21, "queue2", "user1"); + ApplicationAttemptId id22 = createAppAttemptId(2, 2); + scheduler.addApplication(id22, "queue2", "user1"); + + // First ask, queue1 requests 1024 + List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); + ResourceRequest request1 = createResourceRequest(1024, "*", 1, 1); + ask1.add(request1); + scheduler.allocate(id11, ask1, new ArrayList<ContainerId>()); + + // Second ask, queue2 requests 1024 + (2 * 512) + List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>(); + ResourceRequest request2 = createResourceRequest(1024, "foo", 1, 1); + ResourceRequest request3 = createResourceRequest(512, "bar", 1, 2); + ask2.add(request2); + ask2.add(request3); + scheduler.allocate(id21, ask2, new ArrayList<ContainerId>()); + + // Third ask, queue2 requests 1024 + List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>(); + ResourceRequest request4 = createResourceRequest(1024, "*", 1, 1); + ask3.add(request4); + scheduler.allocate(id22, ask3, new ArrayList<ContainerId>()); + + scheduler.update(); + + assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").getQueueSchedulable().getDemand().getMemory()); + assertEquals(1024 + 1024 + (2 * 512), scheduler.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand().getMemory()); + + } + + @Test + public void testAppAdditionAndRemoval() throws Exception { + AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent( + createAppAttemptId(1, 1), "default", "user1"); + scheduler.handle(appAddedEvent1); + + // Scheduler should have one queue (the default) + assertEquals(1, scheduler.getQueueManager().getQueues().size()); + + // That queue should have one app + assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); + + AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent( + createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); + + // Now remove app + scheduler.handle(appRemovedEvent1); + + // Default queue should have no apps + assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); + } + + @Test + public void testAllocationFileParsing() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + // Give queue A a minimum of 1024 M + out.println("<queue name=\"queueA\">"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + // Give queue B a minimum of 2048 M + out.println("<queue name=\"queueB\">"); + out.println("<minResources>2048</minResources>"); + out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>"); + out.println("</queue>"); + // Give queue C no minimum + out.println("<queue name=\"queueC\">"); + out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>"); + out.println("</queue>"); + // Give queue D a limit of 3 running apps + out.println("<queue name=\"queueD\">"); + out.println("<maxRunningApps>3</maxRunningApps>"); + out.println("</queue>"); + // Give queue E a preemption timeout of one minute + out.println("<queue name=\"queueE\">"); + out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); + out.println("</queue>"); + // Set default limit of apps per queue to 15 + out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); + // Set default limit of apps per user to 5 + out.println("<userMaxAppsDefault>5</userMaxAppsDefault>"); + // Give user1 a limit of 10 jobs + out.println("<user name=\"user1\">"); + out.println("<maxRunningApps>10</maxRunningApps>"); + out.println("</user>"); + // Set default min share preemption timeout to 2 minutes + out.println("<defaultMinSharePreemptionTimeout>120" + + "</defaultMinSharePreemptionTimeout>"); + // Set fair share preemption timeout to 5 minutes + out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue + assertEquals(Resources.createResource(0), + queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(Resources.createResource(0), + queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + + assertEquals(Resources.createResource(1024), + queueManager.getMinResources("queueA")); + assertEquals(Resources.createResource(2048), + queueManager.getMinResources("queueB")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueC")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueD")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueE")); + + assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueManager.getQueueMaxApps("queueA")); + assertEquals(15, queueManager.getQueueMaxApps("queueB")); + assertEquals(15, queueManager.getQueueMaxApps("queueC")); + assertEquals(3, queueManager.getQueueMaxApps("queueD")); + assertEquals(15, queueManager.getQueueMaxApps("queueE")); + assertEquals(10, queueManager.getUserMaxApps("user1")); + assertEquals(5, queueManager.getUserMaxApps("user2")); + + // Unspecified queues should get default ACL + Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA"); + assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); + assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); + assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); + assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + // Queue B ACL + Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); + assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); + assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); + + // Queue c ACL + Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); + assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); + assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + assertEquals(120000, queueManager.getMinSharePreemptionTimeout( + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); + assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); + assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); + } + + @Test + public void testBackwardsCompatibleAllocationFileParsing() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + // Give queue A a minimum of 1024 M + out.println("<pool name=\"queueA\">"); + out.println("<minResources>1024</minResources>"); + out.println("</pool>"); + // Give queue B a minimum of 2048 M + out.println("<pool name=\"queueB\">"); + out.println("<minResources>2048</minResources>"); + out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>"); + out.println("</pool>"); + // Give queue C no minimum + out.println("<pool name=\"queueC\">"); + out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>"); + out.println("</pool>"); + // Give queue D a limit of 3 running apps + out.println("<pool name=\"queueD\">"); + out.println("<maxRunningApps>3</maxRunningApps>"); + out.println("</pool>"); + // Give queue E a preemption timeout of one minute + out.println("<pool name=\"queueE\">"); + out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); + out.println("</pool>"); + // Set default limit of apps per queue to 15 + out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); + // Set default limit of apps per user to 5 + out.println("<userMaxAppsDefault>5</userMaxAppsDefault>"); + // Give user1 a limit of 10 jobs + out.println("<user name=\"user1\">"); + out.println("<maxRunningApps>10</maxRunningApps>"); + out.println("</user>"); + // Set default min share preemption timeout to 2 minutes + out.println("<defaultMinSharePreemptionTimeout>120" + + "</defaultMinSharePreemptionTimeout>"); + // Set fair share preemption timeout to 5 minutes + out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue + assertEquals(Resources.createResource(0), + queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(Resources.createResource(0), + queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + + assertEquals(Resources.createResource(1024), + queueManager.getMinResources("queueA")); + assertEquals(Resources.createResource(2048), + queueManager.getMinResources("queueB")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueC")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueD")); + assertEquals(Resources.createResource(0), + queueManager.getMinResources("queueE")); + + assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueManager.getQueueMaxApps("queueA")); + assertEquals(15, queueManager.getQueueMaxApps("queueB")); + assertEquals(15, queueManager.getQueueMaxApps("queueC")); + assertEquals(3, queueManager.getQueueMaxApps("queueD")); + assertEquals(15, queueManager.getQueueMaxApps("queueE")); + assertEquals(10, queueManager.getUserMaxApps("user1")); + assertEquals(5, queueManager.getUserMaxApps("user2")); + + // Unspecified queues should get default ACL + Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA"); + assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); + assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); + assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); + assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + // Queue B ACL + Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); + assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); + assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); + + // Queue c ACL + Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); + assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); + assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); + + assertEquals(120000, queueManager.getMinSharePreemptionTimeout( + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); + assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); + assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); + } + + @Test + public void testIsStarvedForMinShare() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<minResources>2048</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<minResources>2048</minResources>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A wants 3 * 1024. Node update gives this all to A + createSchedulingRequest(3 * 1024, "queueA", "user1"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeEvent2); + + // Queue B arrives and wants 1 * 1024 + createSchedulingRequest(1 * 1024, "queueB", "user1"); + scheduler.update(); + Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); + assertEquals(3, queues.size()); + + // Queue A should be above min share, B below. + for (FSQueue p : queues) { + if (p.getName().equals("queueA")) { + assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + } + else if (p.getName().equals("queueB")) { + assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + } + } + + // Node checks in again, should allocate for B + scheduler.handle(nodeEvent2); + // Now B should have min share ( = demand here) + for (FSQueue p : queues) { + if (p.getName().equals("queueB")) { + assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + } + } + } + + @Test + public void testIsStarvedForFairShare() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.75</weight>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue A wants 3 * 1024. Node update gives this all to A + createSchedulingRequest(3 * 1024, "queueA", "user1"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeEvent2); + + // Queue B arrives and wants 1 * 1024 + createSchedulingRequest(1 * 1024, "queueB", "user1"); + scheduler.update(); + Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); + assertEquals(3, queues.size()); + + // Queue A should be above fair share, B below. + for (FSQueue p : queues) { + if (p.getName().equals("queueA")) { + assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + } + else if (p.getName().equals("queueB")) { + assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + } + } + + // Node checks in again, should allocate for B + scheduler.handle(nodeEvent2); + // B should not be starved for fair share, since entire demand is + // satisfied. + for (FSQueue p : queues) { + if (p.getName().equals("queueB")) { + assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + } + } + } + + @Test + /** + * Make sure containers are chosen to be preempted in the correct order. Right + * now this means decreasing order of priority. + */ + public void testChoiceOfPreemptedContainers() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("</queue>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + // Create four nodes + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate3); + } + + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size()); + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + + scheduler.update(); + + // We should be able to claw back one container from A and B each. + // Make sure it is lowest priority container. + scheduler.preemptResources(scheduler.getQueueSchedulables(), + Resources.createResource(2 * 1024)); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + + // We should be able to claw back another container from A and B each. + // Make sure it is lowest priority container. + scheduler.preemptResources(scheduler.getQueueSchedulables(), + Resources.createResource(2 * 1024)); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + + // Now A and B are below fair share, so preemption shouldn't do anything + scheduler.preemptResources(scheduler.getQueueSchedulables(), + Resources.createResource(2 * 1024)); + assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size()); + assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size()); + assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); + } + + @Test + /** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecision() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + MockClock clock = new MockClock(); + scheduler.setClock(clock); + scheduler.reinitialize(conf, null, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"queueA\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + out.println("<queue name=\"queueD\">"); + out.println("<weight>.25</weight>"); + out.println("<minResources>1024</minResources>"); + out.println("</queue>"); + out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); + out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + // Create four nodes + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024)); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3, + new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>()); + scheduler.handle(nodeUpdate3); + } + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + + scheduler.update(); + + FSQueueSchedulable schedC = + scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable(); + FSQueueSchedulable schedD = + scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable(); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); + // After minSharePreemptionTime has passed, they should want to preempt min + // share. + clock.tick(6); + assertTrue(Resources.equals( + Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime()))); + + // After fairSharePreemptionTime has passed, they should want to preempt + // fair share. + scheduler.update(); + clock.tick(6); + assertTrue(Resources.equals( + Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime()))); + } +}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1361020&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Fri Jul 13 00:43:01 2012 @@ -0,0 +1,179 @@ +~~ Licensed under the Apache License, Version 2.0 (the "License"); +~~ you may not use this file except in compliance with the License. +~~ You may obtain a copy of the License at +~~ +~~ http://www.apache.org/licenses/LICENSE-2.0 +~~ +~~ Unless required by applicable law or agreed to in writing, software +~~ distributed under the License is distributed on an "AS IS" BASIS, +~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~~ See the License for the specific language governing permissions and +~~ limitations under the License. See accompanying LICENSE file. + + --- + Hadoop Map Reduce Next Generation-${project.version} - Fair Scheduler + --- + --- + ${maven.build.timestamp} + +Hadoop MapReduce Next Generation - Fair Scheduler + + \[ {{{./index.html}Go Back}} \] + +%{toc|section=1|fromDepth=0} + +* {Purpose} + + This document describes the <<<FairScheduler>>>, a pluggable scheduler for Hadoop + which provides a way to share large clusters. <<NOTE:>> The Fair Scheduler + implementation is currently under development and should be considered experimental. + +* {Introduction} + + Fair scheduling is a method of assigning resources to applications such that + all apps get, on average, an equal share of resources over time. + Hadoop NextGen is capable of scheduling multiple resource types, such as + Memory and CPU. Currently only memory is supported, so a "cluster share" is + a proportion of aggregate memory in the cluster. When there is a single app + running, that app uses the entire cluster. When other apps are submitted, + resources that free up are assigned to the new apps, so that each app gets + roughly the same amount of resources. Unlike the default Hadoop scheduler, + which forms a queue of apps, this lets short apps finish in reasonable time + while not starving long-lived apps. It is also a reasonable way to share a + cluster between a number of users. Finally, fair sharing can also work with + app priorities - the priorities are used as weights to determine the + fraction of total resources that each app should get. + + The scheduler organizes apps further into "queues", and shares resources + fairly between these queues. By default, all users share a single queue, + called âdefaultâ. If an app specifically lists a queue in a container + resource request, the request is submitted to that queue. It is also + possible to assign queues based on the user name included with the request + through configuration. Within each queue, fair sharing is used to share + capacity between the running apps. queues can also be given weights to share + the cluster non-proportionally in the config file. + + In addition to providing fair sharing, the Fair Scheduler allows assigning + guaranteed minimum shares to queues, which is useful for ensuring that + certain users, groups or production applications always get sufficient + resources. When a queue contains apps, it gets at least its minimum share, + but when the queue does not need its full guaranteed share, the excess is + split between other running apps. This lets the scheduler guarantee capacity + for queues while utilizing resources efficiently when these queues don't + contain applications. + + The Fair Scheduler lets all apps run by default, but it is also possible to + limit the number of running apps per user and per queue through the config + file. This can be useful when a user must submit hundreds of apps at once, + or in general to improve performance if running too many apps at once would + cause too much intermediate data to be created or too much context-switching. + Limiting the apps does not cause any subsequently submitted apps to fail, + only to wait in the scheduler's queue until some of the user's earlier apps + finish. apps to run from each user/queue are chosen in order of priority and + then submit time, as in the default FIFO scheduler in Hadoop. + + Certain add-ons are not yet supported which existed in the original (MR1) + Fair Scheduler. Among them, is the use of a custom policies governing + priority âboostingâ over certain apps. + +* {Installation} + + To use the Fair Scheduler first assign the appropriate scheduler class in + yarn-site.xml: + +------ +<property> + <name>yarn.resourcemanager.scheduler.class</name> + <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> +</property> +------ + +* {Configuration} + + Customizing the Fair Scheduler typically involves altering two files. First, + scheduler-wide options can be set by adding configuration properties in the + fair-scheduler.xml file in your existing configuration directory. Second, in + most cases users will want to create a manifest file listing which queues + exist and their respective weights and capacities. The location of this file + is flexible - but it must be declared in fair-scheduler.xml. + + * <<<yarn.scheduler.fair.allocation.file>>> + + * Path to allocation file. An allocation file is an XML manifest describing + queues and their properties, in addition to certain policy defaults. This file + must be in XML format as described in the next section. + + * <<<yarn.scheduler.fair.minimum-allocation-mb>>> + + * The smallest container size the scheduler can allocate, in MB of memory. + + * <<<yarn.scheduler.fair.minimum-allocation-mb>>> + + * The largest container the scheduler can allocate, in MB of memory. + + * <<<yarn.scheduler.fair.user-as-default-queue>>> + + * Whether to use the username associated with the allocation as the default + queue name, in the event that a queue name is not specified. If this is set + to "false" or unset, all jobs have a shared default queue, called "default". + + * <<<yarn.scheduler.fair.preemption>>> + + * Whether to use preemption. Note that preemption is experimental in the current + version. + + * <<<yarn.scheduler.fair.sizebasedweight>>> + + * Whether to assign shares to individual apps based on their size, rather than + providing an equal share to all apps regardless of size. + + * <<<yarn.scheduler.fair.assignmultiple>>> + + * Whether to allow multiple container assignments in one heartbeat. + +Allocation file format + + The allocation file must be in XML format. The format contains three types of + elements: + + * <<Queue elements>>, which represent queues. Each may contain the following + properties: + + * minResources: minimum amount of aggregate memory + + * maxResources: maximum amount of aggregate memory + + * maxRunningApps: limit the number of apps from the queue to run at once + + * weight: to share the cluster non-proportionally with other queues + + * schedulingMode: either "fifo" or "fair" depending on the in-queue scheduling + policy desired + + * <<User elements>>, which represent settings governing the behavior of individual + users. They can contain a single property: maxRunningApps, a limit on the + number of running apps for a particular user. + + * <<A userMaxAppsDefault element>>, which sets the default running app limit + for any users whose limit is not otherwise specified. + + An example allocation file is given here: + +--- +<?xml version="1.0"?> +<allocations> + <queue name="sample_queue"> + <minResources>100000</minResources> + <maxResources>900000</maxResources> + <maxRunningApps>50</maxRunningApps> + <weight>2.0</weight> + <schedulingMode>fair</schedulingMode> + </queue> + <user name="sample_user"> + <maxRunningApps>30</maxRunningApps> + </user> + <userMaxAppsDefault>5</userMaxAppsDefault> +</allocations> +--- + + Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.