YARN-5731. Preemption calculation is not accurate when reserved containers are 
present in queue. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf0d0844
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf0d0844
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf0d0844

Branch: refs/heads/HDFS-7240
Commit: cf0d0844d6ae25d537391edb9b65fca05d1848e6
Parents: e15e271
Author: Sunil G <sun...@apache.org>
Authored: Thu Jul 13 16:48:29 2017 +0530
Committer: Sunil G <sun...@apache.org>
Committed: Thu Jul 13 16:48:29 2017 +0530

----------------------------------------------------------------------
 .../capacity/FifoCandidatesSelector.java        |  6 +-
 .../ProportionalCapacityPreemptionPolicy.java   | 22 ++++-
 .../CapacitySchedulerPreemptionTestBase.java    |  7 +-
 ...TestCapacitySchedulerSurgicalPreemption.java | 97 +++++++++++++++++++-
 4 files changed, 125 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf0d0844/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index f4d7e92..f843db4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -43,12 +43,12 @@ public class FifoCandidatesSelector
       LogFactory.getLog(FifoCandidatesSelector.class);
   private PreemptableResourceCalculator preemptableAmountCalculator;
 
-  FifoCandidatesSelector(
-      CapacitySchedulerPreemptionContext preemptionContext) {
+  FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
+      boolean includeReservedResource) {
     super(preemptionContext);
 
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext, false);
+        preemptionContext, includeReservedResource);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf0d0844/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 76d6637..719d2eb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -232,7 +232,27 @@ public class ProportionalCapacityPreemptionPolicy
     }
 
     // initialize candidates preemption selection policies
-    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
+    // When select candidates for reserved containers is enabled, exclude 
reserved
+    // resource in fifo policy (less aggressive). Otherwise include reserved
+    // resource.
+    //
+    // Why doing this? In YARN-4390, we added 
preemption-based-on-reserved-container
+    // Support. To reduce unnecessary preemption for large containers. We will
+    // not include reserved resources while calculating ideal-allocation in
+    // FifoCandidatesSelector.
+    //
+    // Changes in YARN-4390 will significantly reduce number of containers 
preempted
+    // When cluster has heterogeneous container requests. (Please check test
+    // report: 
https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
+    //
+    // However, on the other hand, in some corner cases, especially for
+    // fragmented cluster. It could lead to preemption cannot kick in in some
+    // cases. Please see YARN-5731.
+    //
+    // So to solve the problem, we will include reserved when surgical 
preemption
+    // for reserved container, which reverts behavior when YARN-4390 is 
disabled.
+    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
+        !selectCandidatesForResevedContainers));
 
     // Do we need to specially consider intra queue
     boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf0d0844/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
index 943b7d2..55ccb8a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -131,9 +131,10 @@ public class CapacitySchedulerPreemptionTestBase {
   public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
       ApplicationAttemptId appId, int expected) throws InterruptedException {
     int waitNum = 0;
+    int total = 0;
 
     while (waitNum < 500) {
-      int total = 0;
+      total = 0;
       for (RMContainer c : node.getCopiedListOfRunningContainers()) {
         if (c.getApplicationAttemptId().equals(appId)) {
           total++;
@@ -146,7 +147,9 @@ public class CapacitySchedulerPreemptionTestBase {
       waitNum++;
     }
 
-    Assert.fail();
+    Assert.fail(
+        "Check #live-container-on-node-from-app, actual=" + total + " 
expected="
+            + expected);
   }
 
   public void checkNumberOfPreemptionCandidateFromApp(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf0d0844/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 4a37bef..afd2f82 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -36,11 +36,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 
 public class TestCapacitySchedulerSurgicalPreemption
@@ -811,4 +811,99 @@ public class TestCapacitySchedulerSurgicalPreemption
     rm1.close();
   }
 
+  @Test(timeout = 60000)
+  public void testPreemptionForFragmentatedCluster() throws Exception {
+    conf.setBoolean(
+        
CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        false);
+
+    /**
+     * Two queues, a/b, each of them are 50/50
+     * 5 nodes in the cluster, each of them is 30G.
+     *
+     * Submit first app, AM = 3G, and 4 * 21G containers.
+     * Submit second app, AM = 3G, and 4 * 21G containers,
+     *
+     * We can get one container preempted from 1st app.
+     */
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
+        this.conf);
+    conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        1024 * 21);
+    conf.setQueues("root", new String[] { "a", "b" });
+    conf.setCapacity("root.a", 50);
+    conf.setUserLimitFactor("root.a", 100);
+    conf.setCapacity("root.b", 50);
+    conf.setUserLimitFactor("root.b", 100);
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
+
+    am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App1 should have 5 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
+
+    am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App2 should have 2 containers now
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+    // Call editSchedule twice and allocation once, container should get 
allocated
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    int tick = 0;
+    while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
+      // Do allocation for all nodes
+      for (int i = 0; i < 10; i++) {
+        MockNM mockNM = nms.get(i % nms.size());
+        RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+        cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+      }
+      tick++;
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to