YARN-5885. [YARN-4752] Cleanup YARN-4752 for merge (Contributed by Karthik Kambatla via Daniel Templeton)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/59bc9d16 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/59bc9d16 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/59bc9d16 Branch: refs/heads/YARN-4752 Commit: 59bc9d16a8514d65e893f4f30979973c54d2b0a3 Parents: a69763b Author: Daniel Templeton <templ...@apache.org> Authored: Wed Nov 16 17:18:33 2016 -0800 Committer: Karthik Kambatla <ka...@cloudera.com> Committed: Wed Nov 16 17:58:36 2016 -0800 ---------------------------------------------------------------------- .../scheduler/fair/FSAppAttempt.java | 61 +++++-- .../scheduler/fair/FSLeafQueue.java | 37 ++++- .../scheduler/fair/FSPreemptionThread.java | 21 ++- .../scheduler/fair/FSSchedulerNode.java | 8 +- .../scheduler/fair/FairScheduler.java | 7 - .../scheduler/fair/TestFSAppStarvation.java | 48 ++++-- .../scheduler/fair/TestFSLeafQueue.java | 164 +------------------ .../fair/TestQueueManagerRealScheduler.java | 128 +++++++++++++++ 8 files changed, 257 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f5bc2cd..39f4a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -81,12 +82,14 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private Resource fairShare = Resources.createResource(0, 0); // Preemption related variables - private Resource fairshareStarvation = Resources.none(); - private Resource minshareStarvation = Resources.none(); private final Resource preemptedResources = Resources.clone(Resources.none()); private final Set<RMContainer> containersToPreempt = new HashSet<>(); + private Resource fairshareStarvation = Resources.none(); private long lastTimeAtFairShare; + // minShareStarvation attributed to this application by the leaf queue + private Resource minshareStarvation = Resources.none(); + // Used to record node reservation by an app. // Key = RackName, Value = Set of Nodes reserved by app on rack private Map<String, Set<String>> reservations = new HashMap<>(); @@ -149,7 +152,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // Remove from the list of containers liveContainers.remove(rmContainer.getContainerId()); - removePreemption(rmContainer); + untrackContainerForPreemption(rmContainer); Resource containerResource = rmContainer.getContainer().getResource(); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, @@ -510,26 +513,42 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // Preemption related methods + + /** + * Get overall starvation - fairshare and attributed minshare. + * + * @return total starvation attributed to this application + */ Resource getStarvation() { return Resources.add(fairshareStarvation, minshareStarvation); } + /** + * Set the minshare attributed to this application. To be called only from + * {@link FSLeafQueue#updateStarvedApps}. + * + * @param starvation minshare starvation attributed to this app + */ void setMinshareStarvation(Resource starvation) { this.minshareStarvation = starvation; } + /** + * Reset the minshare starvation attributed to this application. To be + * called only from {@link FSLeafQueue#updateStarvedApps} + */ void resetMinshareStarvation() { this.minshareStarvation = Resources.none(); } - void addPreemption(RMContainer container) { + void trackContainerForPreemption(RMContainer container) { containersToPreempt.add(container); synchronized (preemptedResources) { Resources.addTo(preemptedResources, container.getAllocatedResource()); } } - void removePreemption(RMContainer container) { + private void untrackContainerForPreemption(RMContainer container) { synchronized (preemptedResources) { Resources.subtractFrom(preemptedResources, container.getAllocatedResource()); @@ -540,7 +559,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Set<RMContainer> getPreemptionContainers() { return containersToPreempt; } - private Resource getPreemptedResources() { synchronized (preemptedResources) { @@ -563,8 +581,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // Check if any of the parent queues are not preemptable - // TODO (KK): Propagate the "preemptable" flag all the way down to the app - // to avoid recursing up every time. + // TODO (YARN-5831): Propagate the "preemptable" flag all the way down to + // the app to avoid recursing up every time. for (FSQueue q = getQueue(); !q.getQueueName().equals("root"); q = q.getParent()) { @@ -585,8 +603,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt /** * Create and return a container object reflecting an allocation for the - * given appliction on the given node with the given capability and + * given application on the given node with the given capability and * priority. + * * @param node Node * @param capability Capability * @param schedulerKey Scheduler Key @@ -1076,6 +1095,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return appSchedulingInfo.getNextResourceRequest(); } + /** + * Helper method that captures if this app is identified to be starved. + * @return true if the app is starved for fairshare, false otherwise + */ + @VisibleForTesting + boolean isStarvedForFairShare() { + return !Resources.isNone(fairshareStarvation); + } + /* Schedulable methods implementation */ @Override @@ -1105,14 +1133,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt @Override public Resource getResourceUsage() { - // Here the getPreemptedResources() always return zero, except in - // a preemption round - // In the common case where preempted resource is zero, return the - // current consumption Resource object directly without calling - // Resources.subtract which creates a new Resource object for each call. - return getPreemptedResources().equals(Resources.none()) ? - getCurrentConsumption() : - Resources.subtract(getCurrentConsumption(), getPreemptedResources()); + /* + * getResourcesToPreempt() returns zero, except when there are containers + * to preempt. Avoid creating an object in the common case. + */ + return getPreemptedResources().equals(Resources.none()) + ? getCurrentConsumption() + : Resources.subtract(getCurrentConsumption(), getPreemptedResources()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 3fcf627..343e9c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -49,8 +49,9 @@ import static org.apache.hadoop.yarn.util.resource.Resources.none; @Private @Unstable public class FSLeafQueue extends FSQueue { - private static final Log LOG = LogFactory.getLog( - FSLeafQueue.class.getName()); + private static final Log LOG = LogFactory.getLog(FSLeafQueue.class.getName()); + private static final List<FSQueue> EMPTY_LIST = Collections.emptyList(); + private FairScheduler scheduler; private FSContext context; @@ -71,7 +72,6 @@ public class FSLeafQueue extends FSQueue { private Resource amResourceUsage; private final ActiveUsersManager activeUsersManager; - private static final List<FSQueue> EMPTY_LIST = Collections.emptyList(); public FSLeafQueue(String name, FairScheduler scheduler, FSParentQueue parent) { @@ -210,7 +210,7 @@ public class FSLeafQueue extends FSQueue { try { policy.computeShares(runnableApps, getFairShare()); if (checkStarvation) { - updatedStarvedApps(); + updateStarvedApps(); } } finally { readLock.unlock(); @@ -234,7 +234,7 @@ public class FSLeafQueue extends FSQueue { * one application that is starved. And, even if the queue is not * starved due to fairshare, there might still be starved applications. */ - private void updatedStarvedApps() { + private void updateStarvedApps() { // First identify starved applications and track total amount of // starvation (in resources) Resource fairShareStarvation = Resources.clone(none()); @@ -549,10 +549,33 @@ public class FSLeafQueue extends FSQueue { /** * Helper method for tests to check if a queue is starved for minShare. - * @return whether starved for minShare. + * @return whether starved for minshare */ @VisibleForTesting - boolean isStarvedForMinShare() { + private boolean isStarvedForMinShare() { return !Resources.isNone(minShareStarvation()); } + + /** + * Helper method for tests to check if a queue is starved for fairshare. + * @return whether starved for fairshare + */ + @VisibleForTesting + private boolean isStarvedForFairShare() { + for (FSAppAttempt app : runnableApps) { + if (app.isStarvedForFairShare()) { + return true; + } + } + return false; + } + + /** + * Helper method for tests to check if a queue is starved. + * @return whether starved for either minshare or fairshare + */ + @VisibleForTesting + boolean isStarved() { + return isStarvedForMinShare() || isStarvedForFairShare(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 01c830c..3579857 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -36,14 +36,14 @@ import java.util.TimerTask; /** * Thread that handles FairScheduler preemption. */ -public class FSPreemptionThread extends Thread { +class FSPreemptionThread extends Thread { private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class); protected final FSContext context; private final FairScheduler scheduler; private final long warnTimeBeforeKill; private final Timer preemptionTimer; - public FSPreemptionThread(FairScheduler scheduler) { + FSPreemptionThread(FairScheduler scheduler) { this.scheduler = scheduler; this.context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); @@ -80,8 +80,10 @@ public class FSPreemptionThread extends Thread { * Given an app, identify containers to preempt to satisfy the app's next * resource request. * - * @param starvedApp - * @return + * @param starvedApp starved application for which we are identifying + * preemption targets + * @return list of containers to preempt to satisfy starvedApp, null if the + * app cannot be satisfied by preempting any running containers */ private List<RMContainer> identifyContainersToPreempt( FSAppAttempt starvedApp) { @@ -103,14 +105,13 @@ public class FSPreemptionThread extends Thread { // Reset containers for the new node being considered. containers.clear(); + // TODO (YARN-5829): Attempt to reserve the node for starved app. The + // subsequent if-check needs to be reworked accordingly. FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { // This node is already reserved by another app. Let us not consider // this for preemption. continue; - - // TODO (KK): If the nodeReservedApp is over its fairshare, may be it - // is okay to unreserve it if we find enough resources. } // Figure out list of containers to consider @@ -137,13 +138,15 @@ public class FSPreemptionThread extends Thread { // FSSchedulerNode#removeContainerForPreemption. node.addContainersForPreemption(containers); return containers; + } else { + // TODO (YARN-5829): Unreserve the node for the starved app. } } } return null; } - public void preemptContainers(List<RMContainer> containers) { + private void preemptContainers(List<RMContainer> containers) { // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); @@ -151,7 +154,7 @@ public class FSPreemptionThread extends Thread { FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container " + container + " from queue " + queue.getName()); - app.addPreemption(container); + app.trackContainerForPreemption(container); } // Schedule timer task to kill containers http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index a605af6..a27a222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -105,7 +105,7 @@ public class FSSchedulerNode extends SchedulerNode { this.reservedAppSchedulable = null; } - public synchronized FSAppAttempt getReservedAppSchedulable() { + synchronized FSAppAttempt getReservedAppSchedulable() { return reservedAppSchedulable; } @@ -118,14 +118,14 @@ public class FSSchedulerNode extends SchedulerNode { * * @param containers container to mark */ - public void addContainersForPreemption(Collection<RMContainer> containers) { + void addContainersForPreemption(Collection<RMContainer> containers) { containersForPreemption.addAll(containers); } /** * @return set of containers marked for preemption. */ - public Set<RMContainer> getContainersForPreemption() { + Set<RMContainer> getContainersForPreemption() { return containersForPreemption; } @@ -134,7 +134,7 @@ public class FSSchedulerNode extends SchedulerNode { * * @param container container to remove */ - public void removeContainerForPreemption(RMContainer container) { + void removeContainerForPreemption(RMContainer container) { containersForPreemption.remove(container); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index abe8a6a..571f2e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -998,13 +998,6 @@ public class FairScheduler extends * Check if preemption is enabled and the utilization threshold for * preemption is met. * - * TODO (KK): Should we handle the case where usage is less than preemption - * threshold, but there are applications requesting resources on nodes that - * are otherwise occupied by long running applications over their - * fairshare? What if they are occupied by applications not over their - * fairshare? Does this mean YARN should not allocate all resources on a - * node to long-running services? - * * @return true if preemption should be attempted, false otherwise. */ private boolean shouldAttemptPreemption() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index 323152d..a5b2d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -41,10 +41,12 @@ import java.io.PrintWriter; */ public class TestFSAppStarvation extends FairSchedulerTestBase { - private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); + private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES"); // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String[] QUEUES = + {"no-preemption", "minshare", "fairshare.child", "drf.child"}; private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread; @@ -93,13 +95,15 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { assertNotNull("FSContext does not have an FSStarvedApps instance", scheduler.getContext().getStarvedApps()); - assertEquals("Expecting 2 starved applications, one each for the " + - "minshare and fairshare queues", 2, - preemptionThread.uniqueAppsAdded()); + assertEquals("Expecting 3 starved applications, one each for the " + + "minshare and fairshare queues", + 3, preemptionThread.uniqueAppsAdded()); // Verify the apps get added again on a subsequent update scheduler.update(); Thread.yield(); + + verifyLeafQueueStarvation(); assertTrue("Each app is marked as starved exactly once", preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); } @@ -121,6 +125,16 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { preemptionThread.totalAppsAdded()); } + private void verifyLeafQueueStarvation() { + for (String q : QUEUES) { + if (!q.equals("no-preemption")) { + boolean isStarved = + scheduler.getQueueManager().getLeafQueue(q, false).isStarved(); + assertTrue(isStarved); + } + } + } + private void setupClusterAndSubmitJobs() throws Exception { setupStarvedCluster(); submitAppsToEachLeafQueue(); @@ -167,21 +181,24 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { out.println("<minResources>2048mb,2vcores</minResources>"); out.println("</queue>"); - // Queue with fairshare preemption enabled + // FAIR queue with fairshare preemption enabled out.println("<queue name=\"fairshare\">"); out.println("<fairSharePreemptionThreshold>1" + "</fairSharePreemptionThreshold>"); out.println("<fairSharePreemptionTimeout>0" + "</fairSharePreemptionTimeout>"); + out.println("<schedulingPolicy>fair</schedulingPolicy>"); + addChildQueue(out); + out.println("</queue>"); - // Child queue under fairshare with same settings - out.println("<queue name=\"child\">"); + // DRF queue with fairshare preemption enabled + out.println("<queue name=\"drf\">"); out.println("<fairSharePreemptionThreshold>1" + "</fairSharePreemptionThreshold>"); out.println("<fairSharePreemptionTimeout>0" + "</fairSharePreemptionTimeout>"); - out.println("</queue>"); - + out.println("<schedulingPolicy>drf</schedulingPolicy>"); + addChildQueue(out); out.println("</queue>"); out.println("</allocations>"); @@ -210,9 +227,18 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); } + private void addChildQueue(PrintWriter out) { + // Child queue under fairshare with same settings + out.println("<queue name=\"child\">"); + out.println("<fairSharePreemptionThreshold>1" + + "</fairSharePreemptionThreshold>"); + out.println("<fairSharePreemptionTimeout>0" + + "</fairSharePreemptionTimeout>"); + out.println("</queue>"); + } + private void submitAppsToEachLeafQueue() { - String queues[] = {"no-preemption", "minshare", "fairshare.child"}; - for (String queue : queues) { + for (String queue : QUEUES) { createSchedulingRequest(1024, 1, "root." + queue, "user", 1); } scheduler.update(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index b8f4a4d..98de8db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -105,12 +105,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println("<?xml version=\"1.0\"?>"); out.println("<allocations>"); - out.println("<queue name=\"queueA\">"); - out.println("<minResources>2048mb,0vcores</minResources>"); - out.println("</queue>"); - out.println("<queue name=\"queueB\">"); - out.println("<minResources>2048mb,0vcores</minResources>"); - out.println("</queue>"); + out.println("<queue name=\"queueA\"></queue>"); + out.println("<queue name=\"queueB\"></queue>"); out.println("</allocations>"); out.close(); @@ -143,162 +139,6 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { scheduler.update(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - FSLeafQueue queueA = - scheduler.getQueueManager().getLeafQueue("queueA", false); - FSLeafQueue queueB = - scheduler.getQueueManager().getLeafQueue("queueB", false); -// TODO: assertFalse(queueA.isStarvedForMinShare()); -// TODO: assertTrue(queueB.isStarvedForMinShare()); - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) -// TODO: assertFalse(queueB.isStarvedForMinShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println("<?xml version=\"1.0\"?>"); - out.println("<allocations>"); - out.println("<queue name=\"queueA\">"); - out.println("<weight>.2</weight>"); - out.println("</queue>"); - out.println("<queue name=\"queueB\">"); - out.println("<weight>.8</weight>"); - out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>"); - out.println("<queue name=\"queueB1\">"); - out.println("</queue>"); - out.println("<queue name=\"queueB2\">"); - out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>"); - out.println("</queue>"); - out.println("</queue>"); - out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>"); - out.println("</allocations>"); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 4 * 1024. Node update gives this all to A - createSchedulingRequest(1 * 1024, "queueA", "user1", 4); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 want 3 * 1024 - createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); - createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); - scheduler.update(); - for (int i = 0; i < 4; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); - FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); - assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share - // threshold is 1.6 * 1024 -// TODO: assertFalse(queueB1.isStarvedForFairShare()); - - // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share - // threshold is 2.4 * 1024 -// TODO: assertTrue(queueB2.isStarvedForFairShare()); - - // Node checks in again - scheduler.handle(nodeEvent2); - scheduler.handle(nodeEvent2); - assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize()); - assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize()); - - // Both queue B1 and queue B2 usages go to 3 * 1024 -// TODO: assertFalse(queueB1.isStarvedForFairShare()); -// TODO: assertFalse(queueB2.isStarvedForFairShare()); - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShareDRF() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println("<?xml version=\"1.0\"?>"); - out.println("<allocations>"); - out.println("<queue name=\"queueA\">"); - out.println("<weight>.5</weight>"); - out.println("</queue>"); - out.println("<queue name=\"queueB\">"); - out.println("<weight>.5</weight>"); - out.println("</queue>"); - out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>"); - out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>"); - out.println("</allocations>"); - out.close(); - - resourceManager = new MockRM(conf); - resourceManager.start(); - scheduler = (FairScheduler) resourceManager.getResourceScheduler(); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - scheduler.update(); - - // Queue A wants 7 * 1024, 1. Node update gives this all to A - createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - QueueManager queueMgr = scheduler.getQueueManager(); - FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); - assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize()); - assertEquals(1, queueA.getResourceUsage().getVirtualCores()); - - // Queue B has 3 reqs : - // 1) 2 * 1024, 5 .. which will be granted - // 2) 1 * 1024, 1 .. which will be granted - // 3) 1 * 1024, 1 .. which wont - createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1); - createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2); - scheduler.update(); - for (int i = 0; i < 3; i ++) { - scheduler.handle(nodeEvent2); - } - - FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false); - assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize()); - assertEquals(6, queueB.getResourceUsage().getVirtualCores()); - - scheduler.update(); - - // Verify that Queue us not starved for fair share.. - // Since the Starvation logic now uses DRF when the policy = drf, The - // Queue should not be starved -// TODO: assertFalse(queueB.isStarvedForFairShare()); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java new file mode 100644 index 0000000..5736f75 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +import static org.junit.Assert.assertEquals; + +/** + * QueueManager tests that require a real scheduler + */ +public class TestQueueManagerRealScheduler extends FairSchedulerTestBase { + private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr"); + + @Before + public void setup() throws IOException { + createConfiguration(); + writeAllocFile(30, 40); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + ALLOC_FILE.getAbsolutePath()); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + } + + @After + public void teardown() { + ALLOC_FILE.deleteOnExit(); + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + } + + private void writeAllocFile(int defaultFairShareTimeout, + int fairShareTimeout) throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println("<?xml version=\"1.0\"?>"); + out.println("<allocations>"); + out.println("<queue name=\"default\">"); + out.println("</queue>"); + out.println("<queue name=\"queueA\">"); + out.println("</queue>"); + out.println("<queue name=\"queueB\">"); + out.println("<queue name=\"queueB1\">"); + out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); + out.println("</queue>"); + out.println("<queue name=\"queueB2\">"); + out.println("</queue>"); + out.println("</queue>"); + out.println("<queue name=\"queueC\">"); + out.println("</queue>"); + out.println("<defaultMinSharePreemptionTimeout>15" + + "</defaultMinSharePreemptionTimeout>"); + out.println("<defaultFairSharePreemptionTimeout>" + + + defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>"); + out.println("<fairSharePreemptionTimeout>" + + fairShareTimeout + "</fairSharePreemptionTimeout>"); + out.println("</allocations>"); + out.close(); + } + + @Test + public void testBackwardsCompatiblePreemptionConfiguration() + throws IOException { + // Check the min/fair share preemption timeout for each queue + QueueManager queueMgr = scheduler.getQueueManager(); + assertEquals(30000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("default") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueA") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB1") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueB.queueB2") + .getFairSharePreemptionTimeout()); + assertEquals(30000, queueMgr.getQueue("queueC") + .getFairSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("root") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("default") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueA") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB") + .getMinSharePreemptionTimeout()); + assertEquals(5000, queueMgr.getQueue("queueB.queueB1") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueB.queueB2") + .getMinSharePreemptionTimeout()); + assertEquals(15000, queueMgr.getQueue("queueC") + .getMinSharePreemptionTimeout()); + + // Lower the fairshare preemption timeouts and verify it is picked + // correctly. + writeAllocFile(25, 30); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + assertEquals(25000, queueMgr.getQueue("root") + .getFairSharePreemptionTimeout()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org