YARN-5819. [YARN-4752] Verify fairshare and minshare preemption (Contributed by Karthik Kambatla via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a69763b8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a69763b8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a69763b8 Branch: refs/heads/YARN-4752 Commit: a69763b81a439b6df2ed0c164f83fbabf3383cfa Parents: 066576c Author: Daniel Templeton <templ...@apache.org> Authored: Tue Nov 15 09:17:56 2016 -0800 Committer: Karthik Kambatla <ka...@cloudera.com> Committed: Wed Nov 16 17:58:36 2016 -0800 ---------------------------------------------------------------------- .../scheduler/fair/FSAppAttempt.java | 38 ++- .../scheduler/fair/FSPreemptionThread.java | 23 +- .../scheduler/fair/FSSchedulerNode.java | 34 +++ .../scheduler/fair/FairSchedulerTestBase.java | 37 ++- .../scheduler/fair/TestFSAppStarvation.java | 21 +- .../fair/TestFairSchedulerPreemption.java | 275 +++++++++++++++++++ 6 files changed, 389 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6b88bd0..f5bc2cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -83,7 +83,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Preemption related variables private Resource fairshareStarvation = Resources.none(); private Resource minshareStarvation = Resources.none(); - private Resource preemptedResources = Resources.createResource(0); + private final Resource preemptedResources = Resources.clone(Resources.none()); private final Set<RMContainer> containersToPreempt = new HashSet<>(); private long lastTimeAtFairShare; @@ -149,7 +149,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Remove from the list of containers liveContainers.remove(rmContainer.getContainerId()); - containersToPreempt.remove(rmContainer); + removePreemption(rmContainer); Resource containerResource = rmContainer.getContainer().getResource(); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, @@ -524,7 +524,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt void addPreemption(RMContainer container) { containersToPreempt.add(container); - Resources.addTo(preemptedResources, container.getAllocatedResource()); + synchronized (preemptedResources) { + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } + } + + void removePreemption(RMContainer container) { + synchronized (preemptedResources) { + Resources.subtractFrom(preemptedResources, + container.getAllocatedResource()); + } + containersToPreempt.remove(container); } Set<RMContainer> getPreemptionContainers() { @@ -533,7 +543,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private Resource getPreemptedResources() { - return preemptedResources; + synchronized (preemptedResources) { + return preemptedResources; + } } boolean canContainerBePreempted(RMContainer container) { @@ -545,20 +557,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return false; } + if (containersToPreempt.contains(container)) { + // The container is already under consideration for preemption + return false; + } + // Check if any of the parent queues are not preemptable // TODO (KK): Propagate the "preemptable" flag all the way down to the app // to avoid recursing up every time. - FSQueue queue = getQueue(); - while (!queue.getQueueName().equals("root")) { - if (!queue.isPreemptable()) { + for (FSQueue q = getQueue(); + !q.getQueueName().equals("root"); + q = q.getParent()) { + if (!q.isPreemptable()) { return false; } } // Check if the app's allocation will be over its fairshare even // after preempting this container + Resource currentUsage = getResourceUsage(); + Resource fairshare = getFairShare(); + Resource overFairShareBy = Resources.subtract(currentUsage, fairshare); + return (Resources.fitsIn(container.getAllocatedResource(), - Resources.subtract(getResourceUsage(), getFairShare()))); + overFairShareBy)); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 905b6f2..01c830c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -83,8 +83,8 @@ public class FSPreemptionThread extends Thread { * @param starvedApp * @return */ - private List<RMContainer> identifyContainersToPreempt(FSAppAttempt - starvedApp) { + private List<RMContainer> identifyContainersToPreempt( + FSAppAttempt starvedApp) { List<RMContainer> containers = new ArrayList<>(); // return value // Find the nodes that match the next resource request @@ -113,20 +113,29 @@ public class FSPreemptionThread extends Thread { // is okay to unreserve it if we find enough resources. } + // Figure out list of containers to consider + List<RMContainer> containersToCheck = + node.getCopiedListOfRunningContainers(); + containersToCheck.removeAll(node.getContainersForPreemption()); + // Initialize potential with unallocated resources Resource potential = Resources.clone(node.getUnallocatedResource()); - for (RMContainer container : node.getCopiedListOfRunningContainers()) { + for (RMContainer container : containersToCheck) { FSAppAttempt app = scheduler.getSchedulerApp(container.getApplicationAttemptId()); if (app.canContainerBePreempted(container)) { + // Flag container for preemption + containers.add(container); Resources.addTo(potential, container.getAllocatedResource()); } // Check if we have already identified enough containers if (Resources.fitsIn(requestCapability, potential)) { - // TODO (KK): Reserve containers so they can't be taken by another - // app + // Mark the containers as being considered for preemption on the node. + // Make sure the containers are subsequently removed by calling + // FSSchedulerNode#removeContainerForPreemption. + node.addContainersForPreemption(containers); return containers; } } @@ -166,6 +175,10 @@ public class FSPreemptionThread extends Thread { LOG.info("Killing container " + container); scheduler.completedContainer( container, status, RMContainerEventType.KILL); + + FSSchedulerNode containerNode = (FSSchedulerNode) + scheduler.getNodeTracker().getNode(container.getAllocatedNode()); + containerNode.removeContainerForPreemption(container); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 024ec67..a605af6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -29,6 +29,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; + @Private @Unstable public class FSSchedulerNode extends SchedulerNode { @@ -36,6 +40,8 @@ public class FSSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class); private FSAppAttempt reservedAppSchedulable; + private final Set<RMContainer> containersForPreemption = + new ConcurrentSkipListSet<>(); public FSSchedulerNode(RMNode node, boolean usePortForNodeName) { super(node, usePortForNodeName); @@ -103,4 +109,32 @@ public class FSSchedulerNode extends SchedulerNode { return reservedAppSchedulable; } + /** + * Mark {@code containers} as being considered for preemption so they are + * not considered again. A call to this requires a corresponding call to + * {@link #removeContainerForPreemption} to ensure we do not mark a + * container for preemption and never consider it again and avoid memory + * leaks. + * + * @param containers container to mark + */ + public void addContainersForPreemption(Collection<RMContainer> containers) { + containersForPreemption.addAll(containers); + } + + /** + * @return set of containers marked for preemption. + */ + public Set<RMContainer> getContainersForPreemption() { + return containersForPreemption; + } + + /** + * Remove container from the set of containers marked for preemption. + * + * @param container container to remove + */ + public void removeContainerForPreemption(RMContainer container) { + containersForPreemption.remove(container); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 6a308a1..992b75d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -17,14 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import org.junit.Assert; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -39,7 +31,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -50,9 +44,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.io.File; +import java.util.ArrayList; +import java.util.List; + public class FairSchedulerTestBase { public final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); @@ -70,6 +72,11 @@ public class FairSchedulerTestBase { private static final int SLEEP_DURATION = 10; private static final int SLEEP_RETRIES = 1000; + /** + * The list of nodes added to the cluster using the {@link #addNode} method. + */ + protected final List<RMNode> rmNodes = new ArrayList<>(); + // Helper methods public Configuration createConfiguration() { conf = new YarnConfiguration(); @@ -280,4 +287,18 @@ public class FairSchedulerTestBase { Assert.assertEquals(resource.getVirtualCores(), app.getCurrentConsumption().getVirtualCores()); } + + /** + * Add a node to the cluster and track the nodes in {@link #rmNodes}. + * @param memory memory capacity of the node + * @param cores cpu capacity of the node + */ + protected void addNode(int memory, int cores) { + int id = rmNodes.size() + 1; + RMNode node = + MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id, + "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index 0e5511b..323152d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -19,13 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -38,8 +35,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; /** * Test class to verify identification of app starvation @@ -47,7 +42,6 @@ import java.util.List; public class TestFSAppStarvation extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); - private final List<RMNode> rmNodes = new ArrayList<>(); // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -130,7 +124,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { private void setupClusterAndSubmitJobs() throws Exception { setupStarvedCluster(); submitAppsToEachLeafQueue(); - sendNodeUpdateEvents(); + sendEnoughNodeUpdatesToAssignFully(); // Sleep to hit the preemption timeouts Thread.sleep(10); @@ -211,7 +205,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { = createSchedulingRequest(1024, 1, "root.default", "default", 8); scheduler.update(); - sendNodeUpdateEvents(); + sendEnoughNodeUpdatesToAssignFully(); assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); } @@ -224,16 +218,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { scheduler.update(); } - private void addNode(int memory, int cores) { - int id = rmNodes.size() + 1; - RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id, - "127.0.0." + id); - scheduler.handle(new NodeAddedSchedulerEvent(node)); - rmNodes.add(node); - } - - private void sendNodeUpdateEvents() { + private void sendEnoughNodeUpdatesToAssignFully() { for (RMNode node : rmNodes) { NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = new NodeUpdateSchedulerEvent(node); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java new file mode 100644 index 0000000..36ee685 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.Collection; + +/** + * Tests to verify fairshare and minshare preemption, using parameterization. + */ +@RunWith(Parameterized.class) +public class TestFairSchedulerPreemption extends FairSchedulerTestBase { + private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) + private static final int NODE_CAPACITY_MULTIPLE = 4; + + private final boolean fairsharePreemption; + + // App that takes up the entire cluster + private FSAppAttempt greedyApp; + + // Starving app that is expected to instigate preemption + private FSAppAttempt starvingApp; + + @Parameterized.Parameters + public static Collection<Boolean[]> getParameters() { + return Arrays.asList(new Boolean[][] { + {true}, {false}}); + } + + public TestFairSchedulerPreemption(Boolean fairshare) throws IOException { + fairsharePreemption = fairshare; + writeAllocFile(); + } + + @Before + public void setup() { + createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); + conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); + conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + } + + @After + public void teardown() { + ALLOC_FILE.delete(); + conf = null; + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + private void writeAllocFile() throws IOException { + /* + * Queue hierarchy: + * root + * |--- preemptable + * |--- child-1 + * |--- child-2 + * |--- nonpreemptible + * |--- child-1 + * |--- child-2 + */ + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + + out.println("<queue name=\"preemptable\">"); + writePreemptionParams(out); + + // Child-1 + out.println("<queue name=\"child-1\">"); + writeResourceParams(out); + out.println("</queue>"); + + // Child-2 + out.println("<queue name=\"child-2\">"); + writeResourceParams(out); + out.println("</queue>"); + + out.println("</queue>"); // end of preemptable queue + + // Queue with preemption disallowed + out.println("<queue name=\"nonpreemptable\">"); + out.println("<allowPreemptionFrom>false" + + "</allowPreemptionFrom>"); + writePreemptionParams(out); + + // Child-1 + out.println("<queue name=\"child-1\">"); + writeResourceParams(out); + out.println("</queue>"); + + // Child-2 + out.println("<queue name=\"child-2\">"); + writeResourceParams(out); + out.println("</queue>"); + + out.println("</queue>"); // end of nonpreemptable queue + + out.println("</allocations>"); + out.close(); + + assertTrue("Allocation file does not exist, not running the test", + ALLOC_FILE.exists()); + } + + private void writePreemptionParams(PrintWriter out) { + if (fairsharePreemption) { + out.println("<fairSharePreemptionThreshold>1" + + "</fairSharePreemptionThreshold>"); + out.println("<fairSharePreemptionTimeout>0" + + "</fairSharePreemptionTimeout>"); + } else { + out.println("<minSharePreemptionTimeout>0" + + "</minSharePreemptionTimeout>"); + } + } + + private void writeResourceParams(PrintWriter out) { + if (!fairsharePreemption) { + out.println("<minResources>4096mb,4vcores</minResources>"); + } + } + + private void setupCluster() throws IOException { + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Create and add two nodes to the cluster + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE); + } + + private void sendEnoughNodeUpdatesToAssignFully() { + for (RMNode node : rmNodes) { + NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = + new NodeUpdateSchedulerEvent(node); + for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) { + scheduler.handle(nodeUpdateSchedulerEvent); + } + } + } + + /** + * Submit application to {@code queue1} and take over the entire cluster. + * Submit application with larger containers to {@code queue2} that + * requires preemption from the first application. + * + * @param queue1 first queue + * @param queue2 second queue + * @throws InterruptedException if interrupted while waiting + */ + private void submitApps(String queue1, String queue2) + throws InterruptedException { + // Create an app that takes up all the resources on the cluster + ApplicationAttemptId appAttemptId1 + = createSchedulingRequest(1024, 1, queue1, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size()); + greedyApp = scheduler.getSchedulerApp(appAttemptId1); + scheduler.update(); + sendEnoughNodeUpdatesToAssignFully(); + assertEquals(8, greedyApp.getLiveContainers().size()); + + // Create an app that takes up all the resources on the cluster + ApplicationAttemptId appAttemptId2 + = createSchedulingRequest(2048, 2, queue2, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); + starvingApp = scheduler.getSchedulerApp(appAttemptId2); + + // Sleep long enough to pass + Thread.sleep(10); + + scheduler.update(); + } + + private void verifyPreemption() throws InterruptedException { + // Sleep long enough for four containers to be preempted. Note that the + // starved app must be queued four times for containers to be preempted. + for (int i = 0; i < 10000; i++) { + if (greedyApp.getLiveContainers().size() == 4) { + break; + } + Thread.sleep(10); + } + + // Verify the right amount of containers are preempted from greedyApp + assertEquals(4, greedyApp.getLiveContainers().size()); + + sendEnoughNodeUpdatesToAssignFully(); + + // Verify the preempted containers are assigned to starvingApp + assertEquals(2, starvingApp.getLiveContainers().size()); + } + + private void verifyNoPreemption() throws InterruptedException { + // Sleep long enough to ensure not even one container is preempted. + for (int i = 0; i < 600; i++) { + if (greedyApp.getLiveContainers().size() != 8) { + break; + } + Thread.sleep(10); + } + assertEquals(8, greedyApp.getLiveContainers().size()); + } + + @Test + public void testPreemptionWithinSameLeafQueue() throws Exception { + setupCluster(); + String queue = "root.preemptable.child-1"; + submitApps(queue, queue); + if (fairsharePreemption) { + verifyPreemption(); + } else { + verifyNoPreemption(); + } + } + + @Test + public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { + setupCluster(); + submitApps("root.preemptable.child-1", "root.preemptable.child-2"); + verifyPreemption(); + } + + @Test + public void testPreemptionBetweenNonSiblingQueues() throws Exception { + setupCluster(); + submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); + verifyPreemption(); + } + + @Test + public void testNoPreemptionFromDisallowedQueue() throws Exception { + setupCluster(); + submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); + verifyNoPreemption(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org