YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb238d7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb238d7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb238d7e

Branch: refs/heads/HDFS-7240
Commit: fb238d7e5dcd96466c8938b13ca7f13cedecb08a
Parents: 2e8ab3d
Author: Karthik Kambatla <ka...@apache.org>
Authored: Wed Jan 27 11:47:29 2016 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Wed Jan 27 12:29:06 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../scheduler/fair/AllocationConfiguration.java |  11 +-
 .../fair/AllocationFileLoaderService.java       |  16 +-
 .../scheduler/fair/FSParentQueue.java           |   8 +
 .../resourcemanager/scheduler/fair/FSQueue.java |  11 +-
 .../webapp/FairSchedulerPage.java               |   1 +
 .../webapp/dao/FairSchedulerQueueInfo.java      |   7 +
 .../scheduler/fair/TestFairScheduler.java       | 327 +++++++++++++++++++
 8 files changed, 377 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2fbecdb..2fae034 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -67,6 +67,8 @@ Release 2.9.0 - UNRELEASED
     YARN-1856. Added cgroups based memory monitoring for containers as another
     alternative to custom memory-monitoring. (Varun Vasudev via vinodkv)
 
+    YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via 
kasha)
+
   IMPROVEMENTS
 
     YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index bf4eae8..180ae49 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -98,6 +98,8 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
   // Reservation system configuration
   private ReservationQueueConfiguration globalReservationQueueConfig;
 
+  private final Set<String> nonPreemptableQueues;
+
   public AllocationConfiguration(Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources,
       Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
@@ -114,7 +116,8 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
       QueuePlacementPolicy placementPolicy,
       Map<FSQueueType, Set<String>> configuredQueues,
       ReservationQueueConfiguration globalReservationQueueConfig,
-      Set<String> reservableQueues) {
+      Set<String> reservableQueues,
+      Set<String> nonPreemptableQueues) {
     this.minQueueResources = minQueueResources;
     this.maxQueueResources = maxQueueResources;
     this.queueMaxApps = queueMaxApps;
@@ -135,6 +138,7 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     this.globalReservationQueueConfig = globalReservationQueueConfig;
     this.placementPolicy = placementPolicy;
     this.configuredQueues = configuredQueues;
+    this.nonPreemptableQueues = nonPreemptableQueues;
   }
   
   public AllocationConfiguration(Configuration conf) {
@@ -161,6 +165,7 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     }
     placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
         configuredQueues);
+    nonPreemptableQueues = new HashSet<String>();
   }
   
   /**
@@ -210,6 +215,10 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
         -1f : fairSharePreemptionThreshold;
   }
 
+  public boolean isPreemptable(String queueName) {
+    return !nonPreemptableQueues.contains(queueName);
+  }
+
   public ResourceWeights getQueueWeight(String queue) {
     ResourceWeights weight = queueWeights.get(queue);
     return (weight == null) ? ResourceWeights.NEUTRAL : weight;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index 9a31be3..d6012af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -224,6 +224,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     Set<String> reservableQueues = new HashSet<String>();
+    Set<String> nonPreemptableQueues = new HashSet<String>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
     Resource queueMaxResourcesDefault = Resources.unbounded();
@@ -360,7 +361,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
           queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
           queuePolicies, minSharePreemptionTimeouts, 
fairSharePreemptionTimeouts,
           fairSharePreemptionThresholds, queueAcls, configuredQueues,
-          reservableQueues);
+          reservableQueues, nonPreemptableQueues);
     }
 
     // Load placement policy and pass it configured queues
@@ -409,7 +410,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
         defaultSchedPolicy, minSharePreemptionTimeouts,
         fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
         newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
-        reservableQueues);
+        reservableQueues, nonPreemptableQueues);
     
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
@@ -431,7 +432,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
       Map<String, Float> fairSharePreemptionThresholds,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls,
       Map<FSQueueType, Set<String>> configuredQueues,
-      Set<String> reservableQueues)
+      Set<String> reservableQueues,
+      Set<String> nonPreemptableQueues)
       throws AllocationConfigurationException {
     String queueName = element.getAttribute("name").trim();
 
@@ -508,13 +510,19 @@ public class AllocationFileLoaderService extends 
AbstractService {
         isLeaf = false;
         reservableQueues.add(queueName);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
+      } else if ("allowPreemptionFrom".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        if (!Boolean.parseBoolean(text)) {
+          nonPreemptableQueues.add(queueName);
+        }
       } else if ("queue".endsWith(field.getTagName()) || 
           "pool".equals(field.getTagName())) {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
             queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
             queuePolicies, minSharePreemptionTimeouts,
             fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
-            queueAcls, configuredQueues, reservableQueues);
+            queueAcls, configuredQueues, reservableQueues,
+            nonPreemptableQueues);
         isLeaf = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index febe050..a028422 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -260,6 +260,14 @@ public class FSParentQueue extends FSQueue {
     readLock.lock();
     try {
       for (FSQueue queue : childQueues) {
+        // Skip selection for non-preemptable queue
+        if (!queue.isPreemptable()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("skipping from queue=" + getName()
+                + " because it's a non-preemptable queue");
+          }
+          continue;
+        }
         if (candidateQueue == null ||
             comparator.compare(queue, candidateQueue) > 0) {
           candidateQueue = queue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 713bdca..f82411d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -62,6 +62,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
   private long minSharePreemptionTimeout = Long.MAX_VALUE;
   private float fairSharePreemptionThreshold = 0.5f;
+  private boolean preemptable = true;
 
   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -235,6 +236,10 @@ public abstract class FSQueue implements Queue, 
Schedulable {
     this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
   }
 
+  public boolean isPreemptable() {
+    return preemptable;
+  }
+
   /**
    * Recomputes the shares for all child queues and applications based on this
    * queue's current share
@@ -242,7 +247,8 @@ public abstract class FSQueue implements Queue, Schedulable 
{
   public abstract void recomputeShares();
 
   /**
-   * Update the min/fair share preemption timeouts and threshold for this 
queue.
+   * Update the min/fair share preemption timeouts, threshold and preemption
+   * disabled flag for this queue.
    */
   public void updatePreemptionVariables() {
     // For min share timeout
@@ -263,6 +269,9 @@ public abstract class FSQueue implements Queue, Schedulable 
{
     if (fairSharePreemptionThreshold < 0 && parent != null) {
       fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
     }
+    // For option whether allow preemption from this queue
+    preemptable = scheduler.getAllocationConfiguration()
+        .isPreemptable(getName());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
index 5ff9422..689622f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
@@ -81,6 +81,7 @@ public class FairSchedulerPage extends RmView {
       }
       ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString());
       ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString());
+      ri._("Preemptable:", qinfo.isPreemptable());
       html._(InfoBlock.class);
 
       // clear the info contents so this queue's info doesn't accumulate into 
another queue's info

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index ee37f18..e02df65 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -65,6 +65,8 @@ public class FairSchedulerQueueInfo {
   private String queueName;
   private String schedulingPolicy;
 
+  private boolean preemptable;
+
   private FairSchedulerQueueInfoList childQueues;
 
   public FairSchedulerQueueInfo() {
@@ -108,6 +110,7 @@ public class FairSchedulerQueueInfo {
       return;
     }
 
+    preemptable = queue.isPreemptable();
     childQueues = getChildQueues(queue, scheduler);
   }
 
@@ -228,4 +231,8 @@ public class FairSchedulerQueueInfo {
     return childQueues != null ? childQueues.getQueueInfoList() :
         new ArrayList<FairSchedulerQueueInfo>();
   }
+
+  public boolean isPreemptable() {
+    return preemptable;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb238d7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 1b1418a..fac28b7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -2493,6 +2493,333 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
   }
 
   @Test
+  /**
+   * Tests the decision to preempt tasks respect to non-preemptable queues
+   * 1, Queues as follow:
+   *   queueA(non-preemptable)
+   *   queueB(preemptable)
+   *   parentQueue(non-preemptable)
+   *     --queueC(preemptable)
+   *   queueD(preemptable)
+   * 2, Submit request to queueA, queueB, queueC, and all of them are over 
MinShare
+   * 3, Now all resource are occupied
+   * 4, Submit request to queueD, and need to preempt resource from other 
queues
+   * 5, Only preemptable queue(queueB) would be preempted.
+   */
+  public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception 
{
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue\">");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes(3G each)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+            "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+            "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    RMNode node4 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+            "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // Submit apps to queueA, queueB, queueC,
+    // now all resource of the cluster is occupied
+    ApplicationAttemptId app1 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+    ApplicationAttemptId app2 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
+
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new 
NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new 
NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new 
NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new 
NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+
+    assertEquals(4, 
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // Now new requests arrive from queues D
+    ApplicationAttemptId app4 =
+        createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
+    scheduler.update();
+    FSLeafQueue schedD =
+        scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+    // After minSharePreemptionTime has passed, 2G resource should preempted 
from
+    // queueB to queueD
+    clock.tickSec(6);
+    assertEquals(2048,
+        scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // now only app2 is selected to be preempted
+    assertTrue("App2 should have container to be preempted",
+        !Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App1 should not have container to be preempted",
+        Collections.disjoint(
+            scheduler.getSchedulerApp(app1).getLiveContainers(),
+            scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+    assertTrue("App3 should not have container to be preempted",
+        Collections.disjoint(
+            scheduler.getSchedulerApp(app3).getLiveContainers(),
+            scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+    // Pretend 20 seconds have passed
+    clock.tickSec(20);
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new 
NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new 
NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new 
NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new 
NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+    // after preemption
+    assertEquals(4, 
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(2, 
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(2, 
scheduler.getSchedulerApp(app4).getLiveContainers().size());
+  }
+
+  @Test
+  /**
+   * Tests the decision to preempt tasks when allowPreemptionFrom is set false 
on
+   * all queues.
+   * Then none of them would be preempted actually.
+   * 1, Queues as follow:
+   *   queueA(non-preemptable)
+   *   queueB(non-preemptable)
+   *   parentQueue(non-preemptable)
+   *     --queueC(preemptable)
+   *   parentQueue(preemptable)
+   *     --queueD(non-preemptable)
+   * 2, Submit request to queueB, queueC, queueD, and all of them are over 
MinShare
+   * 3, Now all resource are occupied
+   * 4, Submit request to queueA, and need to preempt resource from other 
queues
+   * 5, None of queues would be preempted.
+   */
+  public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
+      throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    ControlledClock clock = new ControlledClock();
+    scheduler.setClock(clock);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue1\">");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("<queue name=\"queueC\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<queue name=\"parentQueue2\">");
+    out.println("<queue name=\"queueD\">");
+    out.println("<weight>.25</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
+    out.println("</queue>");
+    out.println("</queue>");
+    
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create four nodes(3G each)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+            "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+            "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    RMNode node4 =
+        MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+            "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // Submit apps to queueB, queueC, queueD
+    // now all resource of the cluster is occupied
+
+    ApplicationAttemptId app1 =
+        createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
+    ApplicationAttemptId app2 =
+        createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 
2);
+    ApplicationAttemptId app3 =
+        createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 
3);
+    scheduler.update();
+
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new 
NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new 
NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new 
NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new 
NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+
+    assertEquals(4, 
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+    // Now new requests arrive from queues A
+    ApplicationAttemptId app4 =
+        createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+    scheduler.update();
+    FSLeafQueue schedA =
+        scheduler.getQueueManager().getLeafQueue("queueA", true);
+
+    // After minSharePreemptionTime has passed, resource deficit is 2G
+    clock.tickSec(6);
+    assertEquals(2048,
+        scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
+
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    // now none app is selected to be preempted
+    assertTrue("App1 should have container to be preempted",
+        Collections.disjoint(
+            scheduler.getSchedulerApp(app1).getLiveContainers(),
+            scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+    assertTrue("App2 should not have container to be preempted",
+        Collections.disjoint(
+            scheduler.getSchedulerApp(app2).getLiveContainers(),
+            scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+    assertTrue("App3 should not have container to be preempted",
+        Collections.disjoint(
+            scheduler.getSchedulerApp(app3).getLiveContainers(),
+            scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+    // Pretend 20 seconds have passed
+    clock.tickSec(20);
+    scheduler.preemptResources(Resources.createResource(2 * 1024));
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new 
NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+
+      NodeUpdateSchedulerEvent nodeUpdate2 = new 
NodeUpdateSchedulerEvent(node2);
+      scheduler.handle(nodeUpdate2);
+
+      NodeUpdateSchedulerEvent nodeUpdate3 = new 
NodeUpdateSchedulerEvent(node3);
+      scheduler.handle(nodeUpdate3);
+
+      NodeUpdateSchedulerEvent nodeUpdate4 = new 
NodeUpdateSchedulerEvent(node4);
+      scheduler.handle(nodeUpdate4);
+    }
+    // after preemption
+    assertEquals(4, 
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+    assertEquals(4, 
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+    assertEquals(0, 
scheduler.getSchedulerApp(app4).getLiveContainers().size());
+  }
+
+  @Test
   public void testBackwardsCompatiblePreemptionConfiguration() throws 
Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
 

Reply via email to