YARN-5885. [YARN-4752] Cleanup YARN-4752 for merge (Contributed by Karthik 
Kambatla via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/59bc9d16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/59bc9d16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/59bc9d16

Branch: refs/heads/YARN-4752
Commit: 59bc9d16a8514d65e893f4f30979973c54d2b0a3
Parents: a69763b
Author: Daniel Templeton <templ...@apache.org>
Authored: Wed Nov 16 17:18:33 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Wed Nov 16 17:58:36 2016 -0800

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            |  61 +++++--
 .../scheduler/fair/FSLeafQueue.java             |  37 ++++-
 .../scheduler/fair/FSPreemptionThread.java      |  21 ++-
 .../scheduler/fair/FSSchedulerNode.java         |   8 +-
 .../scheduler/fair/FairScheduler.java           |   7 -
 .../scheduler/fair/TestFSAppStarvation.java     |  48 ++++--
 .../scheduler/fair/TestFSLeafQueue.java         | 164 +------------------
 .../fair/TestQueueManagerRealScheduler.java     | 128 +++++++++++++++
 8 files changed, 257 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index f5bc2cd..39f4a3d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -81,12 +82,14 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   private Resource fairShare = Resources.createResource(0, 0);
 
   // Preemption related variables
-  private Resource fairshareStarvation = Resources.none();
-  private Resource minshareStarvation = Resources.none();
   private final Resource preemptedResources = 
Resources.clone(Resources.none());
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
+  private Resource fairshareStarvation = Resources.none();
   private long lastTimeAtFairShare;
 
+  // minShareStarvation attributed to this application by the leaf queue
+  private Resource minshareStarvation = Resources.none();
+
   // Used to record node reservation by an app.
   // Key = RackName, Value = Set of Nodes reserved by app on rack
   private Map<String, Set<String>> reservations = new HashMap<>();
@@ -149,7 +152,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
       // Remove from the list of containers
       liveContainers.remove(rmContainer.getContainerId());
-      removePreemption(rmContainer);
+      untrackContainerForPreemption(rmContainer);
 
       Resource containerResource = rmContainer.getContainer().getResource();
       RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
@@ -510,26 +513,42 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   }
 
   // Preemption related methods
+
+  /**
+   * Get overall starvation - fairshare and attributed minshare.
+   *
+   * @return total starvation attributed to this application
+   */
   Resource getStarvation() {
     return Resources.add(fairshareStarvation, minshareStarvation);
   }
 
+  /**
+   * Set the minshare attributed to this application. To be called only from
+   * {@link FSLeafQueue#updateStarvedApps}.
+   *
+   * @param starvation minshare starvation attributed to this app
+   */
   void setMinshareStarvation(Resource starvation) {
     this.minshareStarvation = starvation;
   }
 
+  /**
+   * Reset the minshare starvation attributed to this application. To be
+   * called only from {@link FSLeafQueue#updateStarvedApps}
+   */
   void resetMinshareStarvation() {
     this.minshareStarvation = Resources.none();
   }
 
-  void addPreemption(RMContainer container) {
+  void trackContainerForPreemption(RMContainer container) {
     containersToPreempt.add(container);
     synchronized (preemptedResources) {
       Resources.addTo(preemptedResources, container.getAllocatedResource());
     }
   }
 
-  void removePreemption(RMContainer container) {
+  private void untrackContainerForPreemption(RMContainer container) {
     synchronized (preemptedResources) {
       Resources.subtractFrom(preemptedResources,
           container.getAllocatedResource());
@@ -540,7 +559,6 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   Set<RMContainer> getPreemptionContainers() {
     return containersToPreempt;
   }
-  
 
   private Resource getPreemptedResources() {
     synchronized (preemptedResources) {
@@ -563,8 +581,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     }
 
     // Check if any of the parent queues are not preemptable
-    // TODO (KK): Propagate the "preemptable" flag all the way down to the app
-    // to avoid recursing up every time.
+    // TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
+    // the app to avoid recursing up every time.
     for (FSQueue q = getQueue();
         !q.getQueueName().equals("root");
         q = q.getParent()) {
@@ -585,8 +603,9 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
   /**
    * Create and return a container object reflecting an allocation for the
-   * given appliction on the given node with the given capability and
+   * given application on the given node with the given capability and
    * priority.
+   *
    * @param node Node
    * @param capability Capability
    * @param schedulerKey Scheduler Key
@@ -1076,6 +1095,15 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     return appSchedulingInfo.getNextResourceRequest();
   }
 
+  /**
+   * Helper method that captures if this app is identified to be starved.
+   * @return true if the app is starved for fairshare, false otherwise
+   */
+  @VisibleForTesting
+  boolean isStarvedForFairShare() {
+    return !Resources.isNone(fairshareStarvation);
+  }
+
   /* Schedulable methods implementation */
 
   @Override
@@ -1105,14 +1133,13 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
   @Override
   public Resource getResourceUsage() {
-    // Here the getPreemptedResources() always return zero, except in
-    // a preemption round
-    // In the common case where preempted resource is zero, return the
-    // current consumption Resource object directly without calling
-    // Resources.subtract which creates a new Resource object for each call.
-    return getPreemptedResources().equals(Resources.none()) ?
-        getCurrentConsumption() :
-        Resources.subtract(getCurrentConsumption(), getPreemptedResources());
+    /*
+     * getResourcesToPreempt() returns zero, except when there are containers
+     * to preempt. Avoid creating an object in the common case.
+     */
+    return getPreemptedResources().equals(Resources.none())
+        ? getCurrentConsumption()
+        : Resources.subtract(getCurrentConsumption(), getPreemptedResources());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 3fcf627..343e9c3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -49,8 +49,9 @@ import static 
org.apache.hadoop.yarn.util.resource.Resources.none;
 @Private
 @Unstable
 public class FSLeafQueue extends FSQueue {
-  private static final Log LOG = LogFactory.getLog(
-      FSLeafQueue.class.getName());
+  private static final Log LOG = 
LogFactory.getLog(FSLeafQueue.class.getName());
+  private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
+
   private FairScheduler scheduler;
   private FSContext context;
 
@@ -71,7 +72,6 @@ public class FSLeafQueue extends FSQueue {
   private Resource amResourceUsage;
 
   private final ActiveUsersManager activeUsersManager;
-  private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
 
   public FSLeafQueue(String name, FairScheduler scheduler,
       FSParentQueue parent) {
@@ -210,7 +210,7 @@ public class FSLeafQueue extends FSQueue {
     try {
       policy.computeShares(runnableApps, getFairShare());
       if (checkStarvation) {
-        updatedStarvedApps();
+        updateStarvedApps();
       }
     } finally {
       readLock.unlock();
@@ -234,7 +234,7 @@ public class FSLeafQueue extends FSQueue {
    * one application that is starved. And, even if the queue is not
    * starved due to fairshare, there might still be starved applications.
    */
-  private void updatedStarvedApps() {
+  private void updateStarvedApps() {
     // First identify starved applications and track total amount of
     // starvation (in resources)
     Resource fairShareStarvation = Resources.clone(none());
@@ -549,10 +549,33 @@ public class FSLeafQueue extends FSQueue {
 
   /**
    * Helper method for tests to check if a queue is starved for minShare.
-   * @return whether starved for minShare.
+   * @return whether starved for minshare
    */
   @VisibleForTesting
-  boolean isStarvedForMinShare() {
+  private boolean isStarvedForMinShare() {
     return !Resources.isNone(minShareStarvation());
   }
+
+  /**
+   * Helper method for tests to check if a queue is starved for fairshare.
+   * @return whether starved for fairshare
+   */
+  @VisibleForTesting
+  private boolean isStarvedForFairShare() {
+    for (FSAppAttempt app : runnableApps) {
+      if (app.isStarvedForFairShare()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Helper method for tests to check if a queue is starved.
+   * @return whether starved for either minshare or fairshare
+   */
+  @VisibleForTesting
+  boolean isStarved() {
+    return isStarvedForMinShare() || isStarvedForFairShare();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 01c830c..3579857 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -36,14 +36,14 @@ import java.util.TimerTask;
 /**
  * Thread that handles FairScheduler preemption.
  */
-public class FSPreemptionThread extends Thread {
+class FSPreemptionThread extends Thread {
   private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
   protected final FSContext context;
   private final FairScheduler scheduler;
   private final long warnTimeBeforeKill;
   private final Timer preemptionTimer;
 
-  public FSPreemptionThread(FairScheduler scheduler) {
+  FSPreemptionThread(FairScheduler scheduler) {
     this.scheduler = scheduler;
     this.context = scheduler.getContext();
     FairSchedulerConfiguration fsConf = scheduler.getConf();
@@ -80,8 +80,10 @@ public class FSPreemptionThread extends Thread {
    * Given an app, identify containers to preempt to satisfy the app's next
    * resource request.
    *
-   * @param starvedApp
-   * @return
+   * @param starvedApp starved application for which we are identifying
+   *                   preemption targets
+   * @return list of containers to preempt to satisfy starvedApp, null if the
+   * app cannot be satisfied by preempting any running containers
    */
   private List<RMContainer> identifyContainersToPreempt(
       FSAppAttempt starvedApp) {
@@ -103,14 +105,13 @@ public class FSPreemptionThread extends Thread {
       // Reset containers for the new node being considered.
       containers.clear();
 
+      // TODO (YARN-5829): Attempt to reserve the node for starved app. The
+      // subsequent if-check needs to be reworked accordingly.
       FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
       if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
         // This node is already reserved by another app. Let us not consider
         // this for preemption.
         continue;
-
-        // TODO (KK): If the nodeReservedApp is over its fairshare, may be it
-        // is okay to unreserve it if we find enough resources.
       }
 
       // Figure out list of containers to consider
@@ -137,13 +138,15 @@ public class FSPreemptionThread extends Thread {
           // FSSchedulerNode#removeContainerForPreemption.
           node.addContainersForPreemption(containers);
           return containers;
+        } else {
+          // TODO (YARN-5829): Unreserve the node for the starved app.
         }
       }
     }
     return null;
   }
 
-  public void preemptContainers(List<RMContainer> containers) {
+  private void preemptContainers(List<RMContainer> containers) {
     // Warn application about containers to be killed
     for (RMContainer container : containers) {
       ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
@@ -151,7 +154,7 @@ public class FSPreemptionThread extends Thread {
       FSLeafQueue queue = app.getQueue();
       LOG.info("Preempting container " + container +
           " from queue " + queue.getName());
-      app.addPreemption(container);
+      app.trackContainerForPreemption(container);
     }
 
     // Schedule timer task to kill containers

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index a605af6..a27a222 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -105,7 +105,7 @@ public class FSSchedulerNode extends SchedulerNode {
     this.reservedAppSchedulable = null;
   }
 
-  public synchronized FSAppAttempt getReservedAppSchedulable() {
+  synchronized FSAppAttempt getReservedAppSchedulable() {
     return reservedAppSchedulable;
   }
 
@@ -118,14 +118,14 @@ public class FSSchedulerNode extends SchedulerNode {
    *
    * @param containers container to mark
    */
-  public void addContainersForPreemption(Collection<RMContainer> containers) {
+  void addContainersForPreemption(Collection<RMContainer> containers) {
     containersForPreemption.addAll(containers);
   }
 
   /**
    * @return set of containers marked for preemption.
    */
-  public Set<RMContainer> getContainersForPreemption() {
+  Set<RMContainer> getContainersForPreemption() {
     return containersForPreemption;
   }
 
@@ -134,7 +134,7 @@ public class FSSchedulerNode extends SchedulerNode {
    *
    * @param container container to remove
    */
-  public void removeContainerForPreemption(RMContainer container) {
+  void removeContainerForPreemption(RMContainer container) {
     containersForPreemption.remove(container);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index abe8a6a..571f2e6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -998,13 +998,6 @@ public class FairScheduler extends
    * Check if preemption is enabled and the utilization threshold for
    * preemption is met.
    *
-   * TODO (KK): Should we handle the case where usage is less than preemption
-   * threshold, but there are applications requesting resources on nodes that
-   * are otherwise occupied by long running applications over their
-   * fairshare? What if they are occupied by applications not over their
-   * fairshare? Does this mean YARN should not allocate all resources on a
-   * node to long-running services?
-   *
    * @return true if preemption should be attempted, false otherwise.
    */
   private boolean shouldAttemptPreemption() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
index 323152d..a5b2d86 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -41,10 +41,12 @@ import java.io.PrintWriter;
  */
 public class TestFSAppStarvation extends FairSchedulerTestBase {
 
-  private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
 
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
+  private static final String[] QUEUES =
+      {"no-preemption", "minshare", "fairshare.child", "drf.child"};
 
   private FairSchedulerWithMockPreemption.MockPreemptionThread 
preemptionThread;
 
@@ -93,13 +95,15 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
 
     assertNotNull("FSContext does not have an FSStarvedApps instance",
         scheduler.getContext().getStarvedApps());
-    assertEquals("Expecting 2 starved applications, one each for the " +
-            "minshare and fairshare queues", 2,
-        preemptionThread.uniqueAppsAdded());
+    assertEquals("Expecting 3 starved applications, one each for the "
+            + "minshare and fairshare queues",
+        3, preemptionThread.uniqueAppsAdded());
 
     // Verify the apps get added again on a subsequent update
     scheduler.update();
     Thread.yield();
+
+    verifyLeafQueueStarvation();
     assertTrue("Each app is marked as starved exactly once",
         preemptionThread.totalAppsAdded() > 
preemptionThread.uniqueAppsAdded());
   }
@@ -121,6 +125,16 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
         preemptionThread.totalAppsAdded());
   }
 
+  private void verifyLeafQueueStarvation() {
+    for (String q : QUEUES) {
+      if (!q.equals("no-preemption")) {
+        boolean isStarved =
+            scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
+        assertTrue(isStarved);
+      }
+    }
+  }
+
   private void setupClusterAndSubmitJobs() throws Exception {
     setupStarvedCluster();
     submitAppsToEachLeafQueue();
@@ -167,21 +181,24 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
     out.println("<minResources>2048mb,2vcores</minResources>");
     out.println("</queue>");
 
-    // Queue with fairshare preemption enabled
+    // FAIR queue with fairshare preemption enabled
     out.println("<queue name=\"fairshare\">");
     out.println("<fairSharePreemptionThreshold>1" +
         "</fairSharePreemptionThreshold>");
     out.println("<fairSharePreemptionTimeout>0" +
         "</fairSharePreemptionTimeout>");
+    out.println("<schedulingPolicy>fair</schedulingPolicy>");
+    addChildQueue(out);
+    out.println("</queue>");
 
-    // Child queue under fairshare with same settings
-    out.println("<queue name=\"child\">");
+    // DRF queue with fairshare preemption enabled
+    out.println("<queue name=\"drf\">");
     out.println("<fairSharePreemptionThreshold>1" +
         "</fairSharePreemptionThreshold>");
     out.println("<fairSharePreemptionTimeout>0" +
         "</fairSharePreemptionTimeout>");
-    out.println("</queue>");
-
+    out.println("<schedulingPolicy>drf</schedulingPolicy>");
+    addChildQueue(out);
     out.println("</queue>");
 
     out.println("</allocations>");
@@ -210,9 +227,18 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
     assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
   }
 
+  private void addChildQueue(PrintWriter out) {
+    // Child queue under fairshare with same settings
+    out.println("<queue name=\"child\">");
+    out.println("<fairSharePreemptionThreshold>1" +
+        "</fairSharePreemptionThreshold>");
+    out.println("<fairSharePreemptionTimeout>0" +
+        "</fairSharePreemptionTimeout>");
+    out.println("</queue>");
+  }
+
   private void submitAppsToEachLeafQueue() {
-    String queues[] = {"no-preemption", "minshare", "fairshare.child"};
-    for (String queue : queues) {
+    for (String queue : QUEUES) {
       createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
     }
     scheduler.update();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index b8f4a4d..98de8db 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -105,12 +105,8 @@ public class TestFSLeafQueue extends FairSchedulerTestBase 
{
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
     out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
+    out.println("<queue name=\"queueA\"></queue>");
+    out.println("<queue name=\"queueB\"></queue>");
     out.println("</allocations>");
     out.close();
 
@@ -143,162 +139,6 @@ public class TestFSLeafQueue extends 
FairSchedulerTestBase {
     scheduler.update();
     Collection<FSLeafQueue> queues = 
scheduler.getQueueManager().getLeafQueues();
     assertEquals(3, queues.size());
-
-    // Queue A should be above min share, B below.
-    FSLeafQueue queueA =
-        scheduler.getQueueManager().getLeafQueue("queueA", false);
-    FSLeafQueue queueB =
-        scheduler.getQueueManager().getLeafQueue("queueB", false);
-// TODO:    assertFalse(queueA.isStarvedForMinShare());
-// TODO:    assertTrue(queueB.isStarvedForMinShare());
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // Now B should have min share ( = demand here)
-// TODO:     assertFalse(queueB.isStarvedForMinShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.2</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.8</weight>");
-    
out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
-    out.println("<queue name=\"queueB1\">");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB2\">");
-    
out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
-    out.println("</queue>");
-    out.println("</queue>");
-    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 4 * 1024. Node update gives this all to A
-    createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 want 3 * 1024
-    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
-    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
-    scheduler.update();
-    for (int i = 0; i < 4; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
-    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
-    assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair 
share
-    // threshold is 1.6 * 1024
-// TODO:   assertFalse(queueB1.isStarvedForFairShare());
-
-    // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair 
share
-    // threshold is 2.4 * 1024
-// TODO:   assertTrue(queueB2.isStarvedForFairShare());
-
-    // Node checks in again
-    scheduler.handle(nodeEvent2);
-    scheduler.handle(nodeEvent2);
-    assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize());
-    assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
-
-    // Both queue B1 and queue B2 usages go to 3 * 1024
-// TODO:   assertFalse(queueB1.isStarvedForFairShare());
-// TODO:   assertFalse(queueB2.isStarvedForFairShare());
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShareDRF() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.5</weight>");
-    out.println("</queue>");
-    
out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
-    
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
-    out.println("</allocations>");
-    out.close();
-
-    resourceManager = new MockRM(conf);
-    resourceManager.start();
-    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-            MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 
1,
-                    "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    scheduler.update();
-
-    // Queue A wants 7 * 1024, 1. Node update gives this all to A
-    createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    QueueManager queueMgr = scheduler.getQueueManager();
-    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
-    assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize());
-    assertEquals(1, queueA.getResourceUsage().getVirtualCores());
-
-    // Queue B has 3 reqs :
-    // 1) 2 * 1024, 5 .. which will be granted
-    // 2) 1 * 1024, 1 .. which will be granted
-    // 3) 1 * 1024, 1 .. which wont
-    createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
-    createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
-    scheduler.update();
-    for (int i = 0; i < 3; i ++) {
-      scheduler.handle(nodeEvent2);
-    }
-
-    FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
-    assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize());
-    assertEquals(6, queueB.getResourceUsage().getVirtualCores());
-
-    scheduler.update();
-
-    // Verify that Queue us not starved for fair share..
-    // Since the Starvation logic now uses DRF when the policy = drf, The
-    // Queue should not be starved
-// TODO:   assertFalse(queueB.isStarvedForFairShare());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59bc9d16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
new file mode 100644
index 0000000..5736f75
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManagerRealScheduler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * QueueManager tests that require a real scheduler
+ */
+public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
+  private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr");
+
+  @Before
+  public void setup() throws IOException {
+    createConfiguration();
+    writeAllocFile(30, 40);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.deleteOnExit();
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  private void writeAllocFile(int defaultFairShareTimeout,
+      int fairShareTimeout) throws IOException {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<queue name=\"queueB1\">");
+    out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueC\">");
+    out.println("</queue>");
+    out.println("<defaultMinSharePreemptionTimeout>15"
+        + "</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>" +
+        + defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionTimeout>"
+        + fairShareTimeout + "</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+  }
+
+  @Test
+  public void testBackwardsCompatiblePreemptionConfiguration()
+      throws IOException {
+    // Check the min/fair share preemption timeout for each queue
+    QueueManager queueMgr = scheduler.getQueueManager();
+    assertEquals(30000, queueMgr.getQueue("root")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("default")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueA")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
+        .getFairSharePreemptionTimeout());
+    assertEquals(30000, queueMgr.getQueue("queueC")
+        .getFairSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("root")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("default")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueA")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB")
+        .getMinSharePreemptionTimeout());
+    assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
+        .getMinSharePreemptionTimeout());
+    assertEquals(15000, queueMgr.getQueue("queueC")
+        .getMinSharePreemptionTimeout());
+
+    // Lower the fairshare preemption timeouts and verify it is picked
+    // correctly.
+    writeAllocFile(25, 30);
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    assertEquals(25000, queueMgr.getQueue("root")
+        .getFairSharePreemptionTimeout());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to