YARN-7574. Add support for Node Labels on Auto Created Leaf Queue Template. 
Contributed by Suma Shivaprasad.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/821b0de4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/821b0de4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/821b0de4

Branch: refs/heads/HDFS-7240
Commit: 821b0de4c59156d4a65112de03ba3e7e1c88e309
Parents: 5700556
Author: Sunil G <sun...@apache.org>
Authored: Mon Apr 9 21:17:22 2018 +0530
Committer: Sunil G <sun...@apache.org>
Committed: Mon Apr 9 21:17:22 2018 +0530

----------------------------------------------------------------------
 .../server/resourcemanager/RMServerUtils.java   |   5 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |  47 ++
 .../resourcemanager/scheduler/Allocation.java   |  12 +
 .../scheduler/SchedulerUtils.java               |  33 +-
 .../capacity/AutoCreatedLeafQueue.java          |   3 +-
 .../AutoCreatedQueueManagementPolicy.java       |  12 +-
 .../scheduler/capacity/CapacityScheduler.java   |   2 +
 .../CapacitySchedulerConfiguration.java         |  28 +
 .../scheduler/capacity/LeafQueue.java           |  11 +
 .../scheduler/capacity/ManagedParentQueue.java  |   5 +-
 .../GuaranteedOrZeroCapacityOverTimePolicy.java | 573 +++++++++++--------
 .../placement/PendingAskUpdateResult.java       |   8 +
 .../yarn/server/resourcemanager/MockNM.java     |  15 +
 .../server/resourcemanager/TestAppManager.java  |  20 +-
 ...stCapacitySchedulerAutoCreatedQueueBase.java | 241 +++++---
 .../TestCapacitySchedulerAutoQueueCreation.java | 233 +++++---
 .../TestQueueManagementDynamicEditPolicy.java   |  30 +-
 17 files changed, 834 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 33451295..ab6bbcf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -236,13 +236,14 @@ public class RMServerUtils {
    */
   public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
       Resource maximumResource, String queueName, YarnScheduler scheduler,
-      RMContext rmContext)
-      throws InvalidResourceRequestException {
+      RMContext rmContext) throws InvalidResourceRequestException {
     // Get queue from scheduler
     QueueInfo queueInfo = null;
     try {
       queueInfo = scheduler.getQueueInfo(queueName, false, false);
     } catch (IOException e) {
+      //Queue may not exist since it could be auto-created in case of
+      // dynamic queues
     }
 
     for (ResourceRequest resReq : ask) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index c23b135..1b1e2c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@@ -75,6 +76,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
+
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
+    .RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
@@ -1109,6 +1113,49 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
               amBlacklist.getBlacklistAdditions() + ") and removals(" +
               amBlacklist.getBlacklistRemovals() + ")");
         }
+
+        QueueInfo queueInfo = null;
+        for (ResourceRequest amReq : appAttempt.amReqs) {
+          if (amReq.getNodeLabelExpression() == null && ResourceRequest.ANY
+              .equals(amReq.getResourceName())) {
+            String queue = appAttempt.rmApp.getQueue();
+
+            //Load queue only once since queue will be same across attempts
+            if (queueInfo == null) {
+              try {
+                queueInfo = appAttempt.scheduler.getQueueInfo(queue, false,
+                    false);
+              } catch (IOException e) {
+                LOG.error("Could not find queue for application : ", e);
+                // Set application status to REJECTED since we cant find the
+                // queue
+                appAttempt.rmContext.getDispatcher().getEventHandler().handle(
+                    new RMAppAttemptEvent(appAttempt.getAppAttemptId(),
+                        RMAppAttemptEventType.FAIL,
+                        "Could not find queue for application : " +
+                        appAttempt.rmApp.getQueue()));
+                appAttempt.rmContext.getDispatcher().getEventHandler().handle(
+                    new RMAppEvent(appAttempt.rmApp.getApplicationId(), 
RMAppEventType
+                        .APP_REJECTED,
+                        "Could not find queue for application : " +
+                            appAttempt.rmApp.getQueue()));
+                return RMAppAttemptState.FAILED;
+              }
+            }
+
+            String labelExp = RMNodeLabelsManager.NO_LABEL;
+            if (queueInfo != null) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Setting default node label expression : " + 
queueInfo
+                    .getDefaultNodeLabelExpression());
+              }
+              labelExp = queueInfo.getDefaultNodeLabelExpression();
+            }
+
+            amReq.setNodeLabelExpression(labelExp);
+          }
+        }
+
         // AM resource has been checked when submission
         Allocation amContainerAllocation =
             appAttempt.scheduler.allocate(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
index 768afde..9573ac8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
@@ -132,4 +132,16 @@ public class Allocation {
   public void setResourceLimit(Resource resource) {
     this.resourceLimit = resource;
   }
+
+  @Override
+  public String toString() {
+    return "Allocation{" + "containers=" + containers + ", strictContainers="
+        + strictContainers + ", fungibleContainers=" + fungibleContainers
+        + ", fungibleResources=" + fungibleResources + ", nmTokens=" + nmTokens
+        + ", increasedContainers=" + increasedContainers
+        + ", decreasedContainers=" + decreasedContainers
+        + ", promotedContainers=" + promotedContainers + ", demotedContainers="
+        + demotedContainers + ", previousAttemptContainers="
+        + previousAttemptContainers + ", resourceLimit=" + resourceLimit + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 27563d6..c0d7d86 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -186,19 +186,33 @@ public class SchedulerUtils {
       ResourceRequest resReq, QueueInfo queueInfo) {
 
     String labelExp = resReq.getNodeLabelExpression();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Requested Node Label Expression : " + labelExp);
+      LOG.debug("Queue Info : " + queueInfo);
+    }
 
     // if queue has default label expression, and RR doesn't have, use the
     // default label expression of queue
     if (labelExp == null && queueInfo != null && ResourceRequest.ANY
         .equals(resReq.getResourceName())) {
+      if ( LOG.isDebugEnabled()) {
+        LOG.debug("Setting default node label expression : " + queueInfo
+            .getDefaultNodeLabelExpression());
+      }
       labelExp = queueInfo.getDefaultNodeLabelExpression();
     }
 
-    // If labelExp still equals to null, set it to be NO_LABEL
-    if (labelExp == null) {
+    // If labelExp still equals to null, it could either be a dynamic queue
+    // or the label is not configured
+    // set it to be NO_LABEL in case of a pre-configured queue. Dynamic
+    // queues are handled in RMAppAttemptImp.ScheduledTransition
+    if (labelExp == null && queueInfo != null) {
       labelExp = RMNodeLabelsManager.NO_LABEL;
     }
-    resReq.setNodeLabelExpression(labelExp);
+
+    if ( labelExp != null) {
+      resReq.setNodeLabelExpression(labelExp);
+    }
   }
 
   public static void normalizeAndValidateRequest(ResourceRequest resReq,
@@ -209,6 +223,7 @@ public class SchedulerUtils {
         isRecovery, rmContext, null);
   }
 
+
   public static void normalizeAndValidateRequest(ResourceRequest resReq,
       Resource maximumResource, String queueName, YarnScheduler scheduler,
       boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
@@ -233,11 +248,12 @@ public class SchedulerUtils {
       try {
         queueInfo = scheduler.getQueueInfo(queueName, false, false);
       } catch (IOException e) {
-        // it is possible queue cannot get when queue mapping is set, just 
ignore
-        // the queueInfo here, and move forward
+        //Queue may not exist since it could be auto-created in case of
+        // dynamic queues
       }
     }
     SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
+
     if (!isRecovery) {
       validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
     }
@@ -245,8 +261,7 @@ public class SchedulerUtils {
 
   public static void normalizeAndvalidateRequest(ResourceRequest resReq,
       Resource maximumResource, String queueName, YarnScheduler scheduler,
-      RMContext rmContext)
-      throws InvalidResourceRequestException {
+      RMContext rmContext) throws InvalidResourceRequestException {
     normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler,
         rmContext, null);
   }
@@ -296,7 +311,7 @@ public class SchedulerUtils {
               + "resource request has resource name = "
               + resReq.getResourceName());
     }
-    
+
     // we don't allow specify label expression with more than one node labels 
now
     if (labelExp != null && labelExp.contains("&&")) {
       throw new InvalidLabelResourceRequestException(
@@ -305,7 +320,7 @@ public class SchedulerUtils {
               + "in a node label expression, node label expression = "
               + labelExp);
     }
-    
+
     if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
       if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
           labelExp, rmContext)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 8b67087..e12b55e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -148,11 +148,10 @@ public class AutoCreatedLeafQueue extends 
AbstractAutoCreatedLeafQueue {
     try {
       for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels
           ()) {
-        //TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574
         setEntitlement(nodeLabel, new QueueEntitlement(0.0f,
             parent.getLeafQueueTemplate()
                 .getQueueCapacities()
-                .getMaximumCapacity()));
+                .getMaximumCapacity(nodeLabel)));
       }
     } catch (SchedulerDynamicEditException e) {
       throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
index f7a4bbd..388e9d6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+
+
+import java.io.IOException;
 import java.util.List;
 
 public interface AutoCreatedQueueManagementPolicy {
@@ -26,14 +29,15 @@ public interface AutoCreatedQueueManagementPolicy {
    * Initialize policy
    * @param schedulerContext Capacity Scheduler context
    */
-  void init(CapacitySchedulerContext schedulerContext, ParentQueue 
parentQueue);
+  void init(CapacitySchedulerContext schedulerContext, ParentQueue
+      parentQueue) throws IOException;
 
   /**
    * Reinitialize policy state ( if required )
    * @param schedulerContext Capacity Scheduler context
    */
   void reinitialize(CapacitySchedulerContext schedulerContext,
-      ParentQueue parentQueue);
+      ParentQueue parentQueue) throws IOException;
 
   /**
    * Get initial template for the specified leaf queue
@@ -48,6 +52,10 @@ public interface AutoCreatedQueueManagementPolicy {
   /**
    * Compute/Adjust child queue capacities
    * for auto created leaf queues
+   * This computes queue entitlements but does not update LeafQueueState or
+   * queue capacities. Scheduler calls commitQueueManagemetChanges after
+   * validation after applying queue changes and commits to LeafQueueState
+   * are done in commitQueueManagementChanges.
    *
    * @return returns a list of suggested QueueEntitlementChange(s) which may
    * or may not be be enforced by the scheduler

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e59bdde..776e512 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1204,6 +1204,8 @@ public class CapacityScheduler extends
       updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
     }
 
+    LOG.info("Allocation for application " + applicationAttemptId + " : " +
+        allocation + " with cluster resource : " + getClusterResource());
     return allocation;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 1870aef..c41bd96 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1878,6 +1878,15 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     setCapacity(leafQueueConfPrefix, val);
   }
 
+  @VisibleForTesting
+  @Private
+  public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
+      String label, float val) {
+    String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+        queuePath);
+    setCapacityByLabel(leafQueueConfPrefix, label, val);
+  }
+
   @Private
   @VisibleForTesting
   public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
@@ -1887,6 +1896,15 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     setMaximumCapacity(leafQueueConfPrefix, val);
   }
 
+  @Private
+  @VisibleForTesting
+  public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
+      String label, float val) {
+    String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+        queuePath);
+    setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
+  }
+
   @VisibleForTesting
   @Private
   public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
@@ -1905,6 +1923,16 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     setUserLimitFactor(leafQueueConfPrefix, val);
   }
 
+  @Private
+  @VisibleForTesting
+  public void setAutoCreatedLeafQueueConfigDefaultNodeLabelExpression(String
+      queuePath,
+      String expression) {
+    String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+        queuePath);
+    setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
+  }
+
   public static String getUnits(String resourceValue) {
     String units;
     for (int i = 0; i < resourceValue.length(); i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 8d1428d..1ae8f91 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -778,6 +778,17 @@ public class LeafQueue extends AbstractCSQueue {
 
       metrics.setAMResouceLimit(nodePartition, amResouceLimit);
       queueUsage.setAMLimit(nodePartition, amResouceLimit);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Queue: " + getQueueName() + ", node label : " +
+            nodePartition
+            + ", queue "
+            + "partition "
+            + "resource : " + queuePartitionResource + ','
+            + " queue current limit : " + queueCurrentLimit + ","
+            + " queue partition usable resource : "
+            + queuePartitionUsableResource + ","
+            + " amResourceLimit : " + amResouceLimit);
+      }
       return amResouceLimit;
     } finally {
       writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
index cbdb21d..2494000 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -132,7 +132,7 @@ public class ManagedParentQueue extends 
AbstractManagedParentQueue {
     }
   }
 
-  private void initializeQueueManagementPolicy() {
+  private void initializeQueueManagementPolicy() throws IOException {
     queueManagementPolicy =
         csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
             getQueuePath());
@@ -140,7 +140,7 @@ public class ManagedParentQueue extends 
AbstractManagedParentQueue {
     queueManagementPolicy.init(csContext, this);
   }
 
-  private void reinitializeQueueManagementPolicy() {
+  private void reinitializeQueueManagementPolicy() throws IOException {
     AutoCreatedQueueManagementPolicy managementPolicy =
         csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
             getQueuePath());
@@ -339,6 +339,7 @@ public class ManagedParentQueue extends 
AbstractManagedParentQueue {
         ((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
         break;
       }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
index aee6405..b2301fd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -63,8 +64,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
-    .NO_LABEL;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .capacity.CSQueueUtils.EPSILON;
 
@@ -85,8 +84,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   private static final Log LOG = LogFactory.getLog(
       GuaranteedOrZeroCapacityOverTimePolicy.class);
 
-  private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT;
-
   private ReentrantReadWriteLock.WriteLock writeLock;
 
   private ReentrantReadWriteLock.ReadLock readLock;
@@ -97,12 +94,70 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
 
   private QueueCapacities leafQueueTemplateCapacities;
 
-  private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>();
+  private Set<String> leafQueueTemplateNodeLabels;
+
+  private LeafQueueState leafQueueState = new LeafQueueState();
 
   private Clock clock = new MonotonicClock();
 
   private class LeafQueueState {
 
+    //map of partition-> queueName->{leaf queue's state}
+    private Map<String, Map<String, LeafQueueStatePerPartition>>
+        leafQueueStateMap = new HashMap<>();
+
+    public boolean containsLeafQueue(String leafQueueName, String partition) {
+      if (leafQueueStateMap.containsKey(partition)) {
+        return leafQueueStateMap.get(partition).containsKey(leafQueueName);
+      }
+      return false;
+    }
+
+    private boolean containsPartition(String partition) {
+      if (leafQueueStateMap.containsKey(partition)) {
+        return true;
+      }
+      return false;
+    }
+
+    private boolean addLeafQueueStateIfNotExists(String leafQueueName,
+        String partition, LeafQueueStatePerPartition leafQueueState) {
+      if (!containsPartition(partition)) {
+        leafQueueStateMap.put(partition, new HashMap<>());
+      }
+      if (!containsLeafQueue(leafQueueName, partition)) {
+        leafQueueStateMap.get(partition).put(leafQueueName, leafQueueState);
+        return true;
+      }
+      return false;
+    }
+
+    public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
+        String partition) {
+      return addLeafQueueStateIfNotExists(leafQueue.getQueueName(), partition,
+          new LeafQueueStatePerPartition());
+    }
+
+    public LeafQueueStatePerPartition getLeafQueueStatePerPartition(
+        String leafQueueName, String partition) {
+      if (leafQueueStateMap.get(partition) != null) {
+        return leafQueueStateMap.get(partition).get(leafQueueName);
+      }
+      return null;
+    }
+
+    public Map<String, Map<String, LeafQueueStatePerPartition>>
+    getLeafQueueStateMap() {
+      return leafQueueStateMap;
+    }
+
+    private void clear() {
+      leafQueueStateMap.clear();
+    }
+  }
+
+  private class LeafQueueStatePerPartition {
+
     private AtomicBoolean isActive = new AtomicBoolean(false);
 
     private long mostRecentActivationTime;
@@ -139,41 +194,16 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
     }
   }
 
-  private boolean containsLeafQueue(String leafQueueName) {
-    return leafQueueStateMap.containsKey(leafQueueName);
-  }
-
-  private boolean addLeafQueueStateIfNotExists(String leafQueueName,
-      LeafQueueState leafQueueState) {
-    if (!containsLeafQueue(leafQueueName)) {
-      leafQueueStateMap.put(leafQueueName, leafQueueState);
-      return true;
-    }
-    return false;
-  }
-
-  private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) {
-    return addLeafQueueStateIfNotExists(leafQueue.getQueueName(),
-        new LeafQueueState());
-  }
-
-  private void clearLeafQueueState() {
-    leafQueueStateMap.clear();
-  }
-
   private class ParentQueueState {
 
     private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel 
=
         new HashMap<String, Float>();
 
-    private float getAbsoluteActivatedChildQueueCapacity() {
-      return getAbsoluteActivatedChildQueueCapacity(NO_LABEL);
-    }
-
     private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
       try {
         readLock.lock();
-        Float totalActivatedCapacity = getByLabel(nodeLabel);
+        Float totalActivatedCapacity = 
getAbsActivatedChildQueueCapacityByLabel(
+            nodeLabel);
         if (totalActivatedCapacity != null) {
           return totalActivatedCapacity;
         } else{
@@ -188,11 +218,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
         float childQueueCapacity) {
       try {
         writeLock.lock();
-        Float activatedChildCapacity = getByLabel(nodeLabel);
+        Float activatedChildCapacity = 
getAbsActivatedChildQueueCapacityByLabel(
+            nodeLabel);
         if (activatedChildCapacity != null) {
-          setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity);
+          setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
+              activatedChildCapacity + childQueueCapacity);
         } else{
-          setByLabel(nodeLabel, childQueueCapacity);
+          setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
+              childQueueCapacity);
         }
       } finally {
         writeLock.unlock();
@@ -203,22 +236,25 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
         float childQueueCapacity) {
       try {
         writeLock.lock();
-        Float activatedChildCapacity = getByLabel(nodeLabel);
+        Float activatedChildCapacity = 
getAbsActivatedChildQueueCapacityByLabel(
+            nodeLabel);
         if (activatedChildCapacity != null) {
-          setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity);
+          setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
+              activatedChildCapacity - childQueueCapacity);
         } else{
-          setByLabel(nodeLabel, childQueueCapacity);
+          setAbsActivatedChildQueueCapacityByLabel(nodeLabel,
+              childQueueCapacity);
         }
       } finally {
         writeLock.unlock();
       }
     }
 
-    Float getByLabel(String label) {
+    Float getAbsActivatedChildQueueCapacityByLabel(String label) {
       return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label);
     }
 
-    Float setByLabel(String label, float val) {
+    Float setAbsActivatedChildQueueCapacityByLabel(String label, float val) {
       return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val);
     }
 
@@ -256,13 +292,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
 
   @Override
   public void init(final CapacitySchedulerContext schedulerContext,
-      final ParentQueue parentQueue) {
+      final ParentQueue parentQueue) throws IOException {
     this.scheduler = schedulerContext;
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
-
     if (!(parentQueue instanceof ManagedParentQueue)) {
       throw new IllegalArgumentException(
           "Expected instance of type " + ManagedParentQueue.class);
@@ -278,15 +313,43 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
             + leafQueueTemplate.getQueueCapacities() + "]");
   }
 
-  private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) {
+  private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue)
+      throws IOException {
     leafQueueTemplate = parentQueue.getLeafQueueTemplate();
 
     leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities();
 
-    ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f,
-        leafQueueTemplateCapacities.getMaximumCapacity());
+    Set<String> parentQueueLabels = parentQueue.getNodeLabelsForQueue();
+    for (String nodeLabel : leafQueueTemplateCapacities
+        .getExistingNodeLabels()) {
+
+      if (!parentQueueLabels.contains(nodeLabel)) {
+        LOG.error("Invalid node label " + nodeLabel
+            + " on configured leaf template on parent" + " queue " + 
parentQueue
+            .getQueueName());
+        throw new IOException("Invalid node label " + nodeLabel
+            + " on configured leaf template on parent" + " queue " + 
parentQueue
+            .getQueueName());
+      }
+    }
+
+    leafQueueTemplateNodeLabels =
+        leafQueueTemplateCapacities.getExistingNodeLabels();
+
   }
 
+  /**
+   * Compute/Adjust child queue capacities
+   * for auto created leaf queues
+   * This computes queue entitlements but does not update LeafQueueState or
+   * queue capacities. Scheduler calls commitQueueManagemetChanges after
+   * validation after applying queue changes and commits to LeafQueueState
+   * are done in commitQueueManagementChanges.
+   *
+   * @return List of Queue Management change suggestions which could 
potentially
+   * be committed/rejected by the scheduler due to validation failures
+   * @throws SchedulerDynamicEditException
+   */
   @Override
   public List<QueueManagementChange> computeQueueManagementChanges()
       throws SchedulerDynamicEditException {
@@ -298,70 +361,92 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
     try {
       readLock.lock();
       List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
+      List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
+
+      //Map of LeafQueue->QueueCapacities - keep adding the computed
+      // entitlements to this map and finally
+      // build the leaf queue configuration Template for all identified leaf
+      // queues
+      Map<String, QueueCapacities> leafQueueEntitlements = new HashMap<>();
+      for (String nodeLabel : leafQueueTemplateNodeLabels) {
+        // check if any leaf queues need to be deactivated based on pending
+        // applications
+        float parentAbsoluteCapacity =
+            managedParentQueue.getQueueCapacities().getAbsoluteCapacity(
+                nodeLabel);
+        float leafQueueTemplateAbsoluteCapacity =
+            leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel);
+        Map<String, QueueCapacities> deactivatedLeafQueues =
+            deactivateLeafQueuesIfInActive(managedParentQueue, nodeLabel,
+                leafQueueEntitlements);
+
+        float deactivatedCapacity = getTotalDeactivatedCapacity(
+            deactivatedLeafQueues, nodeLabel);
+
+        float sumOfChildQueueActivatedCapacity = parentQueueState.
+            getAbsoluteActivatedChildQueueCapacity(nodeLabel);
+
+        //Check if we need to activate anything at all?
+        float availableCapacity =
+            parentAbsoluteCapacity - sumOfChildQueueActivatedCapacity
+                + deactivatedCapacity + EPSILON;
 
-      // check if any leaf queues need to be deactivated based on pending
-      // applications and
-      float parentAbsoluteCapacity =
-          managedParentQueue.getQueueCapacities().getAbsoluteCapacity();
-
-      float leafQueueTemplateAbsoluteCapacity =
-          leafQueueTemplateCapacities.getAbsoluteCapacity();
-      Map<String, QueueCapacities> deactivatedLeafQueues =
-          deactivateLeafQueuesIfInActive(managedParentQueue, 
queueManagementChanges);
-
-      float deactivatedCapacity = getTotalDeactivatedCapacity(
-          deactivatedLeafQueues);
-
-      float sumOfChildQueueActivatedCapacity = parentQueueState.
-          getAbsoluteActivatedChildQueueCapacity();
-
-      //Check if we need to activate anything at all?
-      float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity,
-          deactivatedCapacity, sumOfChildQueueActivatedCapacity);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "Parent queue : " + managedParentQueue.getQueueName() + " 
absCapacity = "
-                + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
-                + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = 
"
-                + deactivatedCapacity + " , absChildActivatedCapacity = "
-                + sumOfChildQueueActivatedCapacity + ", availableCapacity = "
-                + availableCapacity);
-      }
-
-      if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
-        //sort applications across leaf queues by submit time
-        List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Parent queue : " + managedParentQueue.getQueueName()
+              + ", nodeLabel = " + nodeLabel + ", absCapacity = "
+              + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
+              + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
+              + deactivatedCapacity + " , absChildActivatedCapacity = "
+              + sumOfChildQueueActivatedCapacity + ", availableCapacity = "
+              + availableCapacity);
+        }
 
-        if (pendingApps.size() > 0) {
-          int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
-              availableCapacity, leafQueueTemplateAbsoluteCapacity,
-              pendingApps.size());
+        if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
+          //sort applications across leaf queues by submit time
+          if (pendingApps.size() > 0) {
+            int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
+                availableCapacity, leafQueueTemplateAbsoluteCapacity,
+                pendingApps.size());
 
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found " + maxLeafQueuesTobeActivated
-                + " leaf queues to be activated with " + pendingApps.size()
-                + " apps ");
-          }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Found " + maxLeafQueuesTobeActivated + " leaf queues"
+                  + " to be activated with " + pendingApps.size() + " apps ");
+            }
 
-          LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
-              pendingApps, maxLeafQueuesTobeActivated,
-              deactivatedLeafQueues.keySet());
+            LinkedHashSet<String> leafQueuesToBeActivated = 
getSortedLeafQueues(
+                nodeLabel, pendingApps, maxLeafQueuesTobeActivated,
+                deactivatedLeafQueues.keySet());
 
-          //Compute entitlement changes for the identified leaf queues
-          // which is appended to the List of queueManagementChanges
-          computeQueueManagementChanges(leafQueuesToBeActivated,
-              queueManagementChanges, availableCapacity,
-              leafQueueTemplateAbsoluteCapacity);
+            //Compute entitlement changes for the identified leaf queues
+            // which is appended to the List of computedEntitlements
+            updateLeafQueueCapacitiesByLabel(nodeLabel, 
leafQueuesToBeActivated,
+                leafQueueEntitlements);
 
-          if (LOG.isDebugEnabled()) {
-            if (leafQueuesToBeActivated.size() > 0) {
-              LOG.debug(
-                  "Activated leaf queues : [" + leafQueuesToBeActivated + "]");
+            if (LOG.isDebugEnabled()) {
+              if (leafQueuesToBeActivated.size() > 0) {
+                LOG.debug("Activated leaf queues : [" + leafQueuesToBeActivated
+                    + "]");
+              }
             }
           }
         }
       }
+
+      //Populate new entitlements
+
+      for (final Iterator<Map.Entry<String, QueueCapacities>> iterator =
+           leafQueueEntitlements.entrySet().iterator(); iterator.hasNext(); ) {
+        Map.Entry<String, QueueCapacities> queueCapacities = iterator.next();
+        String leafQueueName = queueCapacities.getKey();
+        AutoCreatedLeafQueue leafQueue =
+            (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
+                .getQueue(leafQueueName);
+        AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
+            queueCapacities.getValue());
+        queueManagementChanges.add(
+            new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
+
+      }
       return queueManagementChanges;
     } finally {
       readLock.unlock();
@@ -369,14 +454,14 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   private float getTotalDeactivatedCapacity(
-      Map<String, QueueCapacities> deactivatedLeafQueues) {
+      Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
     float deactivatedCapacity = 0;
     for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
          deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
       Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
           iterator.next();
       deactivatedCapacity +=
-          deactivatedQueueCapacity.getValue().getAbsoluteCapacity();
+          deactivatedQueueCapacity.getValue().getAbsoluteCapacity(nodeLabel);
     }
     return deactivatedCapacity;
   }
@@ -385,20 +470,42 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   void updateLeafQueueState() {
     try {
       writeLock.lock();
+      Set<String> newPartitions = new HashSet<>();
       Set<String> newQueues = new HashSet<>();
+
       for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
         if (newQueue instanceof LeafQueue) {
-          addLeafQueueStateIfNotExists((LeafQueue) newQueue);
+          for (String nodeLabel : leafQueueTemplateNodeLabels) {
+            leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) 
newQueue,
+                nodeLabel);
+            newPartitions.add(nodeLabel);
+          }
           newQueues.add(newQueue.getQueueName());
         }
       }
 
-      for (Iterator<Map.Entry<String, LeafQueueState>> itr =
-           leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) {
-        Map.Entry<String, LeafQueueState> e = itr.next();
-        String queueName = e.getKey();
-        if (!newQueues.contains(queueName)) {
+      for (Iterator<Map.Entry<String, Map<String, LeafQueueStatePerPartition>>>
+           itr = leafQueueState.getLeafQueueStateMap().entrySet().iterator();
+           itr.hasNext(); ) {
+        Map.Entry<String, Map<String, LeafQueueStatePerPartition>> e =
+            itr.next();
+        String partition = e.getKey();
+        if (!newPartitions.contains(partition)) {
           itr.remove();
+          LOG.info(
+              "Removed partition " + partition + " from leaf queue " + 
"state");
+        } else{
+          Map<String, LeafQueueStatePerPartition> queues = e.getValue();
+          for (
+              Iterator<Map.Entry<String, LeafQueueStatePerPartition>> queueItr 
=
+              queues.entrySet().iterator(); queueItr.hasNext(); ) {
+            String queue = queueItr.next().getKey();
+            if (!newQueues.contains(queue)) {
+              queueItr.remove();
+              LOG.info("Removed queue " + queue + " from leaf queue "
+                  + "state from partition " + partition);
+            }
+          }
         }
       }
     } finally {
@@ -406,22 +513,20 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
     }
   }
 
-  private LinkedHashSet<String> getSortedLeafQueues(
+  private LinkedHashSet<String> getSortedLeafQueues(String nodeLabel,
       final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded,
       Set<String> deactivatedQueues) throws SchedulerDynamicEditException {
 
     LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded);
     int ctr = 0;
     for (FiCaSchedulerApp app : pendingApps) {
-
       AutoCreatedLeafQueue leafQueue =
           (AutoCreatedLeafQueue) app.getCSLeafQueue();
       String leafQueueName = leafQueue.getQueueName();
 
       //Check if leafQueue is not active already and has any pending apps
       if (ctr < leafQueuesNeeded) {
-
-        if (!isActive(leafQueue)) {
+        if (!isActive(leafQueue, nodeLabel)) {
           if (!deactivatedQueues.contains(leafQueueName)) {
             if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) {
               ctr++;
@@ -445,11 +550,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   @VisibleForTesting
-  public boolean isActive(final AutoCreatedLeafQueue leafQueue)
-      throws SchedulerDynamicEditException {
+  public boolean isActive(final AutoCreatedLeafQueue leafQueue,
+      String nodeLabel) throws SchedulerDynamicEditException {
     try {
       readLock.lock();
-      LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue);
+      LeafQueueStatePerPartition leafQueueStatus = getLeafQueueState(leafQueue,
+          nodeLabel);
       return leafQueueStatus.isActive();
     } finally {
       readLock.unlock();
@@ -457,64 +563,52 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
-      ParentQueue parentQueue,
-      List<QueueManagementChange> queueManagementChanges)
+      ParentQueue parentQueue, String nodeLabel,
+      Map<String, QueueCapacities> leafQueueEntitlements)
       throws SchedulerDynamicEditException {
     Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
 
     for (CSQueue childQueue : parentQueue.getChildQueues()) {
       AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+      if (leafQueue != null) {
+        if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
+          if (!leafQueueEntitlements.containsKey(leafQueue.getQueueName())) {
+            leafQueueEntitlements.put(leafQueue.getQueueName(),
+                new QueueCapacities(false));
+          }
 
-      if (isActive(leafQueue) && !hasPendingApps(leafQueue)) {
-        queueManagementChanges.add(
-            new QueueManagementChange.UpdateQueue(leafQueue,
-                ZERO_CAPACITY_ENTITLEMENT));
-        deactivatedQueues.put(leafQueue.getQueueName(),
-            leafQueueTemplateCapacities);
-      } else{
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(" Leaf queue has pending applications :  " + leafQueue
-              .getNumApplications() + ".Skipping deactivation for "
-              + leafQueue);
+          QueueCapacities capacities = leafQueueEntitlements.get(
+              leafQueue.getQueueName());
+          updateToZeroCapacity(capacities, nodeLabel);
+          deactivatedQueues.put(leafQueue.getQueueName(),
+              leafQueueTemplateCapacities);
+        } else{
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" Leaf queue has pending applications or is " + 
"inactive"
+                + " : " + leafQueue.getNumApplications()
+                + ".Skipping deactivation for " + leafQueue);
+          }
         }
+      } else{
+        LOG.warn("Could not find queue in scheduler while trying" + " to "
+            + "deactivate for " + parentQueue);
       }
     }
 
-    if (LOG.isDebugEnabled()) {
-      if (deactivatedQueues.size() > 0) {
-        LOG.debug("Deactivated leaf queues : " + deactivatedQueues);
-      }
-    }
     return deactivatedQueues;
   }
 
-  private void computeQueueManagementChanges(
+  private void updateLeafQueueCapacitiesByLabel(String nodeLabel,
       Set<String> leafQueuesToBeActivated,
-      List<QueueManagementChange> queueManagementChanges,
-      final float availableCapacity,
-      final float leafQueueTemplateAbsoluteCapacity) {
-
-    float curAvailableCapacity = availableCapacity;
-
+      Map<String, QueueCapacities> leafQueueEntitlements) {
     for (String curLeafQueue : leafQueuesToBeActivated) {
-      // Activate queues if capacity is available
-      if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) {
-        AutoCreatedLeafQueue leafQueue =
-            (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
-                .getQueue(curLeafQueue);
-        if (leafQueue != null) {
-          AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
-              leafQueueTemplateCapacities.getCapacity(),
-              leafQueueTemplateCapacities.getMaximumCapacity());
-          queueManagementChanges.add(
-              new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
-          curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity;
-        } else{
-          LOG.warn(
-              "Could not find queue in scheduler while trying to deactivate "
-                  + curLeafQueue);
-        }
+      if (!leafQueueEntitlements.containsKey(curLeafQueue)) {
+        leafQueueEntitlements.put(curLeafQueue, new QueueCapacities(false));
+        // Activate queues if capacity is available
       }
+
+      QueueCapacities capacities = leafQueueEntitlements.get(curLeafQueue);
+      updateCapacityFromTemplate(capacities, nodeLabel);
     }
   }
 
@@ -528,17 +622,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
           availableCapacity / childQueueAbsoluteCapacity);
 
       return Math.min(numLeafQueuesNeeded, numPendingApps);
-    } else{
-      throw new SchedulerDynamicEditException("Child queue absolute capacity "
-          + "is initialized to 0. Check parent queue's  " + managedParentQueue
-          .getQueueName() + " leaf queue template configuration");
     }
-  }
-
-  private float getAvailableCapacity(float parentAbsCapacity,
-      float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) {
-    return parentAbsCapacity - totalChildQueueActivatedCapacity
-        + deactivatedAbsCapacity + EPSILON;
+    return 0;
   }
 
   /**
@@ -567,25 +652,27 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
 
         AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue;
 
-        if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) {
-          if (isActive(leafQueue)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Queue is already active. Skipping activation : " + queue
-                      .getQueuePath());
+        for (String nodeLabel : updatedQueueTemplate.getQueueCapacities()
+            .getExistingNodeLabels()) {
+          if (updatedQueueTemplate.getQueueCapacities().
+              getCapacity(nodeLabel) > 0) {
+            if (isActive(leafQueue, nodeLabel)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Queue is already active." + " Skipping activation : 
"
+                    + queue.getQueuePath());
+              }
+            } else{
+              activate(leafQueue, nodeLabel);
             }
           } else{
-            activate(leafQueue);
-          }
-        } else{
-          if (!isActive(leafQueue)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(
-                  "Queue is already de-activated. " + "Skipping de-activation "
-                      + ": " + leafQueue.getQueuePath());
+            if (!isActive(leafQueue, nodeLabel)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Queue is already de-activated. Skipping "
+                    + "de-activation : " + leafQueue.getQueuePath());
+              }
+            } else{
+              deactivate(leafQueue, nodeLabel);
             }
-          } else{
-            deactivate(leafQueue);
           }
         }
       }
@@ -594,30 +681,26 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
     }
   }
 
-  private void activate(final AutoCreatedLeafQueue leafQueue)
-      throws SchedulerDynamicEditException {
+  private void activate(final AbstractAutoCreatedLeafQueue leafQueue,
+      String nodeLabel) throws SchedulerDynamicEditException {
     try {
       writeLock.lock();
-      getLeafQueueState(leafQueue).activate();
-
-      parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL,
-          leafQueueTemplateCapacities.getAbsoluteCapacity());
+      getLeafQueueState(leafQueue, nodeLabel).activate();
+      parentQueueState.incAbsoluteActivatedChildCapacity(nodeLabel,
+          leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
     } finally {
       writeLock.unlock();
     }
   }
 
-  private void deactivate(final AutoCreatedLeafQueue leafQueue)
-      throws SchedulerDynamicEditException {
+  private void deactivate(final AbstractAutoCreatedLeafQueue leafQueue,
+      String nodeLabel) throws SchedulerDynamicEditException {
     try {
       writeLock.lock();
-      getLeafQueueState(leafQueue).deactivate();
+      getLeafQueueState(leafQueue, nodeLabel).deactivate();
 
-      for (String nodeLabel : managedParentQueue.getQueueCapacities()
-          .getExistingNodeLabels()) {
-        parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
-            leafQueueTemplateCapacities.getAbsoluteCapacity());
-      }
+      parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
+          leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
     } finally {
       writeLock.unlock();
     }
@@ -629,7 +712,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
 
   @Override
   public void reinitialize(CapacitySchedulerContext schedulerContext,
-      final ParentQueue parentQueue) {
+      final ParentQueue parentQueue) throws IOException {
     if (!(parentQueue instanceof ManagedParentQueue)) {
       throw new IllegalStateException(
           "Expected instance of type " + ManagedParentQueue.class + " found  "
@@ -649,12 +732,11 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
 
     //clear state
     parentQueueState.clear();
-    clearLeafQueueState();
+    leafQueueState.clear();
 
     LOG.info(
-        "Reinitialized queue management policy for parent queue "
-            + parentQueue.getQueueName() +" with leaf queue template "
-            + "capacities : ["
+        "Reinitialized queue management policy for parent queue " + parentQueue
+            .getQueueName() + " with leaf queue template " + "capacities : ["
             + leafQueueTemplate.getQueueCapacities() + "]");
   }
 
@@ -663,51 +745,74 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
       AbstractAutoCreatedLeafQueue leafQueue)
       throws SchedulerDynamicEditException {
 
-    if ( !(leafQueue instanceof  AutoCreatedLeafQueue)) {
-      throw new SchedulerDynamicEditException("Not an instance of "
-          + "AutoCreatedLeafQueue : " + leafQueue.getClass());
+    AutoCreatedLeafQueueConfig template;
+
+    if (!(leafQueue instanceof AutoCreatedLeafQueue)) {
+      throw new SchedulerDynamicEditException(
+          "Not an instance of " + "AutoCreatedLeafQueue : " + leafQueue
+              .getClass());
     }
 
-    AutoCreatedLeafQueue autoCreatedLeafQueue =
-        (AutoCreatedLeafQueue) leafQueue;
-    AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT;
     try {
       writeLock.lock();
-      if (!addLeafQueueStateIfNotExists(leafQueue)) {
-        LOG.error("Leaf queue already exists in state : " + getLeafQueueState(
-            leafQueue));
-        throw new SchedulerDynamicEditException(
-            "Leaf queue already exists in state : " + getLeafQueueState(
-                leafQueue));
-      }
 
-      float availableCapacity = getAvailableCapacity(
-          managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0,
-          parentQueueState.getAbsoluteActivatedChildQueueCapacity());
+      QueueCapacities capacities = new QueueCapacities(false);
+      for (String nodeLabel : leafQueueTemplateNodeLabels) {
+        if (!leafQueueState.createLeafQueueStateIfNotExists(leafQueue,
+            nodeLabel)) {
+          String message =
+              "Leaf queue already exists in state : " + getLeafQueueState(
+                  leafQueue, nodeLabel);
+          LOG.error(message);
+        }
 
-      if (availableCapacity >= leafQueueTemplateCapacities
-          .getAbsoluteCapacity()) {
-        activate(autoCreatedLeafQueue);
-        template = buildTemplate(leafQueueTemplateCapacities.getCapacity(),
-            leafQueueTemplateCapacities.getMaximumCapacity());
+        float availableCapacity = managedParentQueue.getQueueCapacities().
+            getAbsoluteCapacity(nodeLabel) - parentQueueState.
+            getAbsoluteActivatedChildQueueCapacity(nodeLabel) + EPSILON;
+
+        if (availableCapacity >= leafQueueTemplateCapacities
+            .getAbsoluteCapacity(nodeLabel)) {
+          updateCapacityFromTemplate(capacities, nodeLabel);
+          activate(leafQueue, nodeLabel);
+        } else{
+          updateToZeroCapacity(capacities, nodeLabel);
+        }
       }
+
+      template = buildTemplate(capacities);
     } finally {
       writeLock.unlock();
     }
     return template;
   }
 
+  private void updateToZeroCapacity(QueueCapacities capacities,
+      String nodeLabel) {
+    capacities.setCapacity(nodeLabel, 0.0f);
+    capacities.setMaximumCapacity(nodeLabel,
+        leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
+  }
+
+  private void updateCapacityFromTemplate(QueueCapacities capacities,
+      String nodeLabel) {
+    capacities.setCapacity(nodeLabel,
+        leafQueueTemplateCapacities.getCapacity(nodeLabel));
+    capacities.setMaximumCapacity(nodeLabel,
+        leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
+  }
+
   @VisibleForTesting
-  LeafQueueState getLeafQueueState(LeafQueue queue)
-      throws SchedulerDynamicEditException {
+  LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
+      String partition) throws SchedulerDynamicEditException {
     try {
       readLock.lock();
       String queueName = queue.getQueueName();
-      if (!containsLeafQueue(queueName)) {
+      if (!leafQueueState.containsLeafQueue(queueName, partition)) {
         throw new SchedulerDynamicEditException(
             "Could not find leaf queue in " + "state " + queueName);
       } else{
-        return leafQueueStateMap.get(queueName);
+        return leafQueueState.
+            getLeafQueueStatePerPartition(queueName, partition);
       }
     } finally {
       readLock.unlock();
@@ -715,8 +820,8 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   @VisibleForTesting
-  public float getAbsoluteActivatedChildQueueCapacity() {
-    return parentQueueState.getAbsoluteActivatedChildQueueCapacity();
+  public float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
+    return parentQueueState.getAbsoluteActivatedChildQueueCapacity(nodeLabel);
   }
 
   private List<FiCaSchedulerApp> getSortedPendingApplications() {
@@ -726,20 +831,10 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
     return apps;
   }
 
-  private AutoCreatedLeafQueueConfig buildTemplate(float capacity,
-      float maxCapacity) {
+  private AutoCreatedLeafQueueConfig buildTemplate(QueueCapacities capacities) 
{
     AutoCreatedLeafQueueConfig.Builder templateBuilder =
         new AutoCreatedLeafQueueConfig.Builder();
-
-    QueueCapacities capacities = new QueueCapacities(false);
     templateBuilder.capacities(capacities);
-
-    for (String nodeLabel : managedParentQueue.getQueueCapacities()
-        .getExistingNodeLabels()) {
-      capacities.setCapacity(nodeLabel, capacity);
-      capacities.setMaximumCapacity(nodeLabel, maxCapacity);
-    }
-
     return new AutoCreatedLeafQueueConfig(templateBuilder);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java
index 8765e86..8702c03 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/PendingAskUpdateResult.java
@@ -62,4 +62,12 @@ public class PendingAskUpdateResult {
   public String getNewNodePartition() {
     return newNodePartition;
   }
+
+  @Override
+  public String toString() {
+    return "PendingAskUpdateResult{" + "lastPendingAsk=" + lastPendingAsk
+        + ", lastNodePartition='" + lastNodePartition + '\''
+        + ", newPendingAsk=" + newPendingAsk + ", newNodePartition='"
+        + newNodePartition + '\'' + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 0a06e82..2e28395 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -65,6 +67,7 @@ public class MockNM {
       new HashMap<ContainerId, ContainerStatus>();
   private Map<ApplicationId, AppCollectorData> registeringCollectors
       = new ConcurrentHashMap<>();
+  private Set<NodeLabel> nodeLabels;
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService 
resourceTracker) {
     // scale vcores based on the requested memory
@@ -101,6 +104,13 @@ public class MockNM {
     nodeId = BuilderUtils.newNodeId(splits[0], Integer.parseInt(splits[1]));
   }
 
+  public MockNM(String nodeIdStr, Resource capability,
+      ResourceTrackerService resourceTracker, String version, Set<NodeLabel>
+      nodeLabels) {
+    this(nodeIdStr, capability, resourceTracker, version);
+    this.nodeLabels = nodeLabels;
+  }
+
   public NodeId getNodeId() {
     return nodeId;
   }
@@ -164,12 +174,17 @@ public class MockNM {
       List<ApplicationId> runningApplications) throws Exception {
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
+
     req.setNodeId(nodeId);
     req.setHttpPort(httpPort);
     req.setResource(capability);
     req.setContainerStatuses(containerReports);
     req.setNMVersion(version);
     req.setRunningApplications(runningApplications);
+    if ( nodeLabels != null && nodeLabels.size() > 0) {
+      req.setNodeLabels(nodeLabels);
+    }
+
     RegisterNodeManagerResponse registrationResponse =
         resourceTracker.registerNodeManager(req);
     this.currentContainerTokenMasterKey =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/821b0de4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 1a1b527..f6cdfec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -57,6 +58,7 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -247,10 +249,11 @@ public class TestAppManager{
   private TestRMAppManager appMonitor;
   private ApplicationSubmissionContext asContext;
   private ApplicationId appId;
+  private QueueInfo mockDefaultQueueInfo;
 
   @SuppressWarnings("deprecation")
   @Before
-  public void setUp() {
+  public void setUp() throws IOException {
     long now = System.currentTimeMillis();
 
     rmContext = mockRMContext(1, now - 10);
@@ -258,6 +261,7 @@ public class TestAppManager{
         .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class));
     ResourceScheduler scheduler = mockResourceScheduler();
     ((RMContextImpl)rmContext).setScheduler(scheduler);
+
     Configuration conf = new Configuration();
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     ((RMContextImpl) rmContext).setYarnConfiguration(conf);
@@ -275,6 +279,11 @@ public class TestAppManager{
     asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
     asContext.setResource(mockResource());
     asContext.setPriority(Priority.newInstance(0));
+    asContext.setQueue("default");
+    mockDefaultQueueInfo = mock(QueueInfo.class);
+    when(scheduler.getQueueInfo("default", false, false))
+        .thenReturn(mockDefaultQueueInfo);
+
     setupDispatcher(rmContext, conf);
   }
 
@@ -709,6 +718,7 @@ public class TestAppManager{
     for (ResourceRequest req : reqs) {
       req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
     }
+
     // setAMContainerResourceRequests has priority over
     // setAMContainerResourceRequest and setResource
     Assert.assertEquals(reqs, app.getAMResourceRequests());
@@ -722,6 +732,7 @@ public class TestAppManager{
     ResourceRequest req =
         ResourceRequest.newInstance(Priority.newInstance(0),
             ResourceRequest.ANY, Resources.createResource(1025), 1, true);
+    req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
     asContext.setAMContainerResourceRequest(cloneResourceRequest(req));
     // getAMContainerResourceRequests uses a singleton list of
     // getAMContainerResourceRequest
@@ -729,7 +740,6 @@ public class TestAppManager{
     Assert.assertEquals(req, 
asContext.getAMContainerResourceRequests().get(0));
     Assert.assertEquals(1, asContext.getAMContainerResourceRequests().size());
     RMApp app = testRMAppSubmit();
-    req.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
     // setAMContainerResourceRequest has priority over setResource
     Assert.assertEquals(Collections.singletonList(req),
         app.getAMResourceRequests());
@@ -740,10 +750,12 @@ public class TestAppManager{
     asContext.setResource(Resources.createResource(1024));
     asContext.setAMContainerResourceRequests(null);
     RMApp app = testRMAppSubmit();
+
     // setResource
     Assert.assertEquals(Collections.singletonList(
         ResourceRequest.newInstance(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
-        ResourceRequest.ANY, Resources.createResource(1024), 1, true, "")),
+        ResourceRequest.ANY, Resources.createResource(1024), 1, true,
+            "")),
         app.getAMResourceRequests());
   }
 
@@ -766,6 +778,8 @@ public class TestAppManager{
       throws Exception {
     asContext.setResource(null);
     List<ResourceRequest> reqs = new ArrayList<>();
+    when(mockDefaultQueueInfo.getAccessibleNodeLabels()).thenReturn
+        (new HashSet<String>() {{ add("label1"); add(""); }});
     ResourceRequest anyReq = ResourceRequest.newInstance(
         Priority.newInstance(1),
         ResourceRequest.ANY, Resources.createResource(1024), 1, false, 
"label1",


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to