YARN-5819. [YARN-4752] Verify fairshare and minshare preemption (Contributed by 
Karthik Kambatla via Daniel Templeton)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a69763b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a69763b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a69763b8

Branch: refs/heads/YARN-4752
Commit: a69763b81a439b6df2ed0c164f83fbabf3383cfa
Parents: 066576c
Author: Daniel Templeton <templ...@apache.org>
Authored: Tue Nov 15 09:17:56 2016 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Wed Nov 16 17:58:36 2016 -0800

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            |  38 ++-
 .../scheduler/fair/FSPreemptionThread.java      |  23 +-
 .../scheduler/fair/FSSchedulerNode.java         |  34 +++
 .../scheduler/fair/FairSchedulerTestBase.java   |  37 ++-
 .../scheduler/fair/TestFSAppStarvation.java     |  21 +-
 .../fair/TestFairSchedulerPreemption.java       | 275 +++++++++++++++++++
 6 files changed, 389 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 6b88bd0..f5bc2cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -83,7 +83,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   // Preemption related variables
   private Resource fairshareStarvation = Resources.none();
   private Resource minshareStarvation = Resources.none();
-  private Resource preemptedResources = Resources.createResource(0);
+  private final Resource preemptedResources = 
Resources.clone(Resources.none());
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
   private long lastTimeAtFairShare;
 
@@ -149,7 +149,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
       // Remove from the list of containers
       liveContainers.remove(rmContainer.getContainerId());
-      containersToPreempt.remove(rmContainer);
+      removePreemption(rmContainer);
 
       Resource containerResource = rmContainer.getContainer().getResource();
       RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
@@ -524,7 +524,17 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
   void addPreemption(RMContainer container) {
     containersToPreempt.add(container);
-    Resources.addTo(preemptedResources, container.getAllocatedResource());
+    synchronized (preemptedResources) {
+      Resources.addTo(preemptedResources, container.getAllocatedResource());
+    }
+  }
+
+  void removePreemption(RMContainer container) {
+    synchronized (preemptedResources) {
+      Resources.subtractFrom(preemptedResources,
+          container.getAllocatedResource());
+    }
+    containersToPreempt.remove(container);
   }
 
   Set<RMContainer> getPreemptionContainers() {
@@ -533,7 +543,9 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   
 
   private Resource getPreemptedResources() {
-    return preemptedResources;
+    synchronized (preemptedResources) {
+      return preemptedResources;
+    }
   }
 
   boolean canContainerBePreempted(RMContainer container) {
@@ -545,20 +557,30 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       return false;
     }
 
+    if (containersToPreempt.contains(container)) {
+      // The container is already under consideration for preemption
+      return false;
+    }
+
     // Check if any of the parent queues are not preemptable
     // TODO (KK): Propagate the "preemptable" flag all the way down to the app
     // to avoid recursing up every time.
-    FSQueue queue = getQueue();
-    while (!queue.getQueueName().equals("root")) {
-      if (!queue.isPreemptable()) {
+    for (FSQueue q = getQueue();
+        !q.getQueueName().equals("root");
+        q = q.getParent()) {
+      if (!q.isPreemptable()) {
         return false;
       }
     }
 
     // Check if the app's allocation will be over its fairshare even
     // after preempting this container
+    Resource currentUsage = getResourceUsage();
+    Resource fairshare = getFairShare();
+    Resource overFairShareBy = Resources.subtract(currentUsage, fairshare);
+
     return (Resources.fitsIn(container.getAllocatedResource(),
-        Resources.subtract(getResourceUsage(), getFairShare())));
+        overFairShareBy));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 905b6f2..01c830c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -83,8 +83,8 @@ public class FSPreemptionThread extends Thread {
    * @param starvedApp
    * @return
    */
-  private List<RMContainer> identifyContainersToPreempt(FSAppAttempt
-      starvedApp) {
+  private List<RMContainer> identifyContainersToPreempt(
+      FSAppAttempt starvedApp) {
     List<RMContainer> containers = new ArrayList<>(); // return value
 
     // Find the nodes that match the next resource request
@@ -113,20 +113,29 @@ public class FSPreemptionThread extends Thread {
         // is okay to unreserve it if we find enough resources.
       }
 
+      // Figure out list of containers to consider
+      List<RMContainer> containersToCheck =
+          node.getCopiedListOfRunningContainers();
+      containersToCheck.removeAll(node.getContainersForPreemption());
+
       // Initialize potential with unallocated resources
       Resource potential = Resources.clone(node.getUnallocatedResource());
-      for (RMContainer container : node.getCopiedListOfRunningContainers()) {
+      for (RMContainer container : containersToCheck) {
         FSAppAttempt app =
             scheduler.getSchedulerApp(container.getApplicationAttemptId());
 
         if (app.canContainerBePreempted(container)) {
+          // Flag container for preemption
+          containers.add(container);
           Resources.addTo(potential, container.getAllocatedResource());
         }
 
         // Check if we have already identified enough containers
         if (Resources.fitsIn(requestCapability, potential)) {
-          // TODO (KK): Reserve containers so they can't be taken by another
-          // app
+          // Mark the containers as being considered for preemption on the 
node.
+          // Make sure the containers are subsequently removed by calling
+          // FSSchedulerNode#removeContainerForPreemption.
+          node.addContainersForPreemption(containers);
           return containers;
         }
       }
@@ -166,6 +175,10 @@ public class FSPreemptionThread extends Thread {
         LOG.info("Killing container " + container);
         scheduler.completedContainer(
             container, status, RMContainerEventType.KILL);
+
+        FSSchedulerNode containerNode = (FSSchedulerNode)
+            scheduler.getNodeTracker().getNode(container.getAllocatedNode());
+        containerNode.removeContainerForPreemption(container);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 024ec67..a605af6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -29,6 +29,10 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+
 @Private
 @Unstable
 public class FSSchedulerNode extends SchedulerNode {
@@ -36,6 +40,8 @@ public class FSSchedulerNode extends SchedulerNode {
   private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
 
   private FSAppAttempt reservedAppSchedulable;
+  private final Set<RMContainer> containersForPreemption =
+      new ConcurrentSkipListSet<>();
 
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
@@ -103,4 +109,32 @@ public class FSSchedulerNode extends SchedulerNode {
     return reservedAppSchedulable;
   }
 
+  /**
+   * Mark {@code containers} as being considered for preemption so they are
+   * not considered again. A call to this requires a corresponding call to
+   * {@link #removeContainerForPreemption} to ensure we do not mark a
+   * container for preemption and never consider it again and avoid memory
+   * leaks.
+   *
+   * @param containers container to mark
+   */
+  public void addContainersForPreemption(Collection<RMContainer> containers) {
+    containersForPreemption.addAll(containers);
+  }
+
+  /**
+   * @return set of containers marked for preemption.
+   */
+  public Set<RMContainer> getContainersForPreemption() {
+    return containersForPreemption;
+  }
+
+  /**
+   * Remove container from the set of containers marked for preemption.
+   *
+   * @param container container to remove
+   */
+  public void removeContainerForPreemption(RMContainer container) {
+    containersForPreemption.remove(container);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
index 6a308a1..992b75d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
@@ -17,14 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
-import org.junit.Assert;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,7 +31,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -50,9 +44,17 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import org.junit.Assert;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
 public class FairSchedulerTestBase {
   public final static String TEST_DIR =
       new File(System.getProperty("test.build.data", 
"/tmp")).getAbsolutePath();
@@ -70,6 +72,11 @@ public class FairSchedulerTestBase {
   private static final int SLEEP_DURATION = 10;
   private static final int SLEEP_RETRIES = 1000;
 
+  /**
+   * The list of nodes added to the cluster using the {@link #addNode} method.
+   */
+  protected final List<RMNode> rmNodes = new ArrayList<>();
+
   // Helper methods
   public Configuration createConfiguration() {
     conf = new YarnConfiguration();
@@ -280,4 +287,18 @@ public class FairSchedulerTestBase {
     Assert.assertEquals(resource.getVirtualCores(),
         app.getCurrentConsumption().getVirtualCores());
   }
+
+  /**
+   * Add a node to the cluster and track the nodes in {@link #rmNodes}.
+   * @param memory memory capacity of the node
+   * @param cores cpu capacity of the node
+   */
+  protected void addNode(int memory, int cores) {
+    int id = rmNodes.size() + 1;
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
+            "127.0.0." + id);
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+    rmNodes.add(node);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
index 0e5511b..323152d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -19,13 +19,10 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -38,8 +35,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * Test class to verify identification of app starvation
@@ -47,7 +42,6 @@ import java.util.List;
 public class TestFSAppStarvation extends FairSchedulerTestBase {
 
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
-  private final List<RMNode> rmNodes = new ArrayList<>();
 
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
@@ -130,7 +124,7 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
   private void setupClusterAndSubmitJobs() throws Exception {
     setupStarvedCluster();
     submitAppsToEachLeafQueue();
-    sendNodeUpdateEvents();
+    sendEnoughNodeUpdatesToAssignFully();
 
     // Sleep to hit the preemption timeouts
     Thread.sleep(10);
@@ -211,7 +205,7 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
         = createSchedulingRequest(1024, 1, "root.default", "default", 8);
 
     scheduler.update();
-    sendNodeUpdateEvents();
+    sendEnoughNodeUpdatesToAssignFully();
 
     assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
   }
@@ -224,16 +218,7 @@ public class TestFSAppStarvation extends 
FairSchedulerTestBase {
     scheduler.update();
   }
 
-  private void addNode(int memory, int cores) {
-    int id = rmNodes.size() + 1;
-    RMNode node =
-        MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
-            "127.0.0." + id);
-    scheduler.handle(new NodeAddedSchedulerEvent(node));
-    rmNodes.add(node);
-  }
-
-  private void sendNodeUpdateEvents() {
+  private void sendEnoughNodeUpdatesToAssignFully() {
     for (RMNode node : rmNodes) {
       NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
           new NodeUpdateSchedulerEvent(node);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a69763b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
new file mode 100644
index 0000000..36ee685
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Tests to verify fairshare and minshare preemption, using parameterization.
+ */
+@RunWith(Parameterized.class)
+public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+  private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
+
+  // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
+  private static final int NODE_CAPACITY_MULTIPLE = 4;
+
+  private final boolean fairsharePreemption;
+
+  // App that takes up the entire cluster
+  private FSAppAttempt greedyApp;
+
+  // Starving app that is expected to instigate preemption
+  private FSAppAttempt starvingApp;
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> getParameters() {
+    return Arrays.asList(new Boolean[][] {
+        {true}, {false}});
+  }
+
+  public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
+    fairsharePreemption = fairshare;
+    writeAllocFile();
+  }
+
+  @Before
+  public void setup() {
+    createConfiguration();
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
+        ALLOC_FILE.getAbsolutePath());
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+    conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
+  }
+
+  @After
+  public void teardown() {
+    ALLOC_FILE.delete();
+    conf = null;
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+  }
+
+  private void writeAllocFile() throws IOException {
+    /*
+     * Queue hierarchy:
+     * root
+     * |--- preemptable
+     *      |--- child-1
+     *      |--- child-2
+     * |--- nonpreemptible
+     *      |--- child-1
+     *      |--- child-2
+     */
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+
+    out.println("<queue name=\"preemptable\">");
+    writePreemptionParams(out);
+
+    // Child-1
+    out.println("<queue name=\"child-1\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    // Child-2
+    out.println("<queue name=\"child-2\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    out.println("</queue>"); // end of preemptable queue
+
+    // Queue with preemption disallowed
+    out.println("<queue name=\"nonpreemptable\">");
+    out.println("<allowPreemptionFrom>false" +
+        "</allowPreemptionFrom>");
+    writePreemptionParams(out);
+
+    // Child-1
+    out.println("<queue name=\"child-1\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    // Child-2
+    out.println("<queue name=\"child-2\">");
+    writeResourceParams(out);
+    out.println("</queue>");
+
+    out.println("</queue>"); // end of nonpreemptable queue
+
+    out.println("</allocations>");
+    out.close();
+
+    assertTrue("Allocation file does not exist, not running the test",
+        ALLOC_FILE.exists());
+  }
+
+  private void writePreemptionParams(PrintWriter out) {
+    if (fairsharePreemption) {
+      out.println("<fairSharePreemptionThreshold>1" +
+          "</fairSharePreemptionThreshold>");
+      out.println("<fairSharePreemptionTimeout>0" +
+          "</fairSharePreemptionTimeout>");
+    } else {
+      out.println("<minSharePreemptionTimeout>0" +
+          "</minSharePreemptionTimeout>");
+    }
+  }
+
+  private void writeResourceParams(PrintWriter out) {
+    if (!fairsharePreemption) {
+      out.println("<minResources>4096mb,4vcores</minResources>");
+    }
+  }
+
+  private void setupCluster() throws IOException {
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Create and add two nodes to the cluster
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+    addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
+  }
+
+  private void sendEnoughNodeUpdatesToAssignFully() {
+    for (RMNode node : rmNodes) {
+      NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
+          new NodeUpdateSchedulerEvent(node);
+      for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
+        scheduler.handle(nodeUpdateSchedulerEvent);
+      }
+    }
+  }
+
+  /**
+   * Submit application to {@code queue1} and take over the entire cluster.
+   * Submit application with larger containers to {@code queue2} that
+   * requires preemption from the first application.
+   *
+   * @param queue1 first queue
+   * @param queue2 second queue
+   * @throws InterruptedException if interrupted while waiting
+   */
+  private void submitApps(String queue1, String queue2)
+      throws InterruptedException {
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId appAttemptId1
+        = createSchedulingRequest(1024, 1, queue1, "default",
+        NODE_CAPACITY_MULTIPLE * rmNodes.size());
+    greedyApp = scheduler.getSchedulerApp(appAttemptId1);
+    scheduler.update();
+    sendEnoughNodeUpdatesToAssignFully();
+    assertEquals(8, greedyApp.getLiveContainers().size());
+
+    // Create an app that takes up all the resources on the cluster
+    ApplicationAttemptId appAttemptId2
+        = createSchedulingRequest(2048, 2, queue2, "default",
+        NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
+    starvingApp = scheduler.getSchedulerApp(appAttemptId2);
+
+    // Sleep long enough to pass
+    Thread.sleep(10);
+
+    scheduler.update();
+  }
+
+  private void verifyPreemption() throws InterruptedException {
+    // Sleep long enough for four containers to be preempted. Note that the
+    // starved app must be queued four times for containers to be preempted.
+    for (int i = 0; i < 10000; i++) {
+      if (greedyApp.getLiveContainers().size() == 4) {
+        break;
+      }
+      Thread.sleep(10);
+    }
+
+    // Verify the right amount of containers are preempted from greedyApp
+    assertEquals(4, greedyApp.getLiveContainers().size());
+
+    sendEnoughNodeUpdatesToAssignFully();
+
+    // Verify the preempted containers are assigned to starvingApp
+    assertEquals(2, starvingApp.getLiveContainers().size());
+  }
+
+  private void verifyNoPreemption() throws InterruptedException {
+    // Sleep long enough to ensure not even one container is preempted.
+    for (int i = 0; i < 600; i++) {
+      if (greedyApp.getLiveContainers().size() != 8) {
+        break;
+      }
+      Thread.sleep(10);
+    }
+    assertEquals(8, greedyApp.getLiveContainers().size());
+  }
+
+  @Test
+  public void testPreemptionWithinSameLeafQueue() throws Exception {
+    setupCluster();
+    String queue = "root.preemptable.child-1";
+    submitApps(queue, queue);
+    if (fairsharePreemption) {
+      verifyPreemption();
+    } else {
+      verifyNoPreemption();
+    }
+  }
+
+  @Test
+  public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
+    setupCluster();
+    submitApps("root.preemptable.child-1", "root.preemptable.child-2");
+    verifyPreemption();
+  }
+
+  @Test
+  public void testPreemptionBetweenNonSiblingQueues() throws Exception {
+    setupCluster();
+    submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
+    verifyPreemption();
+  }
+
+  @Test
+  public void testNoPreemptionFromDisallowedQueue() throws Exception {
+    setupCluster();
+    submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
+    verifyNoPreemption();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to