YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue 
mapping. (Suma Shivaprasad via wangda)

Change-Id: Ia1704bb8cb5070e5b180b5a85787d7b9ca57ebc6


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0987a7b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0987a7b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0987a7b8

Branch: refs/heads/YARN-6592
Commit: 0987a7b8cbbbb2c1e4c2095821d98a7db19644df
Parents: f2efaf0
Author: Wangda Tan <wan...@apache.org>
Authored: Thu Nov 16 11:22:48 2017 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Thu Nov 16 11:25:52 2017 -0800

----------------------------------------------------------------------
 .../server/resourcemanager/RMAppManager.java    |   7 +-
 .../placement/ApplicationPlacementContext.java  |  52 ++
 .../placement/PlacementManager.java             |  34 +-
 .../placement/PlacementRule.java                |   7 +-
 .../UserGroupMappingPlacementRule.java          | 284 ++++++-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  87 +-
 .../scheduler/capacity/AbstractCSQueue.java     |   2 +-
 .../capacity/AbstractManagedParentQueue.java    | 196 +++--
 .../capacity/AutoCreatedLeafQueue.java          |  27 +-
 .../scheduler/capacity/CapacityScheduler.java   | 157 +++-
 .../CapacitySchedulerConfiguration.java         | 153 ++++
 .../capacity/CapacitySchedulerQueueManager.java | 103 ++-
 .../scheduler/capacity/ManagedParentQueue.java  | 158 ++++
 .../scheduler/capacity/ParentQueue.java         |  13 -
 .../scheduler/capacity/PlanQueue.java           |  25 +-
 .../scheduler/event/AppAddedSchedulerEvent.java |  37 +-
 .../server/resourcemanager/TestAppManager.java  |  29 +-
 .../TestUserGroupMappingPlacementRule.java      |  14 +-
 .../scheduler/TestSchedulerUtils.java           |   1 +
 .../capacity/TestCapacityScheduler.java         |   6 +-
 .../TestCapacitySchedulerAutoQueueCreation.java | 794 +++++++++++++++++++
 21 files changed, 1921 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index d042590..5e82f40 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -360,13 +360,8 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
   private RMAppImpl createAndPopulateNewRMApp(
       ApplicationSubmissionContext submissionContext, long submitTime,
       String user, boolean isRecovery, long startTime) throws YarnException {
+
     if (!isRecovery) {
-      // Do queue mapping
-      if (rmContext.getQueuePlacementManager() != null) {
-        // We only do queue mapping when it's a new application
-        rmContext.getQueuePlacementManager().placeApplication(
-            submissionContext, user);
-      }
       // fail the submission if configured application timeout value is invalid
       RMServerUtils.validateApplicationTimeouts(
           submissionContext.getApplicationTimeouts());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
new file mode 100644
index 0000000..f2f92b8
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.placement;
+
+/**
+ * Each placement rule when it successfully places an application onto a queue
+ * returns a PlacementRuleContext which encapsulates the queue the
+ * application was mapped to and any parent queue for the queue (if configured)
+ */
+public class ApplicationPlacementContext {
+
+  private String queue;
+
+  private String parentQueue;
+
+  public ApplicationPlacementContext(String queue) {
+    this(queue,null);
+  }
+
+  public ApplicationPlacementContext(String queue, String parentQueue) {
+    this.queue = queue;
+    this.parentQueue = parentQueue;
+  }
+
+  public String getQueue() {
+    return queue;
+  }
+
+  public String getParentQueue() {
+    return parentQueue;
+  }
+
+  public boolean hasParentQueue() {
+    return parentQueue != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
index 43a4deb..c006738 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,36 +52,33 @@ public class PlacementManager {
     }
   }
 
-  public void placeApplication(ApplicationSubmissionContext asc, String user)
-      throws YarnException {
+  public ApplicationPlacementContext placeApplication(
+      ApplicationSubmissionContext asc, String user) throws YarnException {
+
     try {
       readLock.lock();
+
       if (null == rules || rules.isEmpty()) {
-        return;
+        return null;
       }
-      
-      String newQueueName = null;
+
+      ApplicationPlacementContext placement = null;
       for (PlacementRule rule : rules) {
-        newQueueName = rule.getQueueForApp(asc, user);
-        if (newQueueName != null) {
+        placement = rule.getPlacementForApp(asc, user);
+        if (placement != null) {
           break;
         }
       }
-      
+
       // Failed to get where to place application
-      if (null == newQueueName && null == asc.getQueue()) {
-        String msg = "Failed to get where to place application="
-            + asc.getApplicationId();
+      if (null == placement && null == asc.getQueue()) {
+        String msg = "Failed to get where to place application=" + asc
+            .getApplicationId();
         LOG.error(msg);
         throw new YarnException(msg);
       }
-      
-      // Set it to ApplicationSubmissionContext
-      if (!StringUtils.equals(asc.getQueue(), newQueueName)) {
-        LOG.info("Placed application=" + asc.getApplicationId() + " to queue="
-            + newQueueName + ", original queue=" + asc.getQueue());
-        asc.setQueue(newQueueName);
-      }
+
+      return placement;
     } finally {
       readLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
index 47dc48a..a9d5e33 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 
 public abstract class PlacementRule {
+
   public String getName() {
     return this.getClass().getName();
   }
@@ -50,6 +51,6 @@ public abstract class PlacementRule {
    *         in the {@link PlacementManager} will take care
    *         </p>
    */
-  public abstract String getQueueForApp(ApplicationSubmissionContext asc,
-      String user) throws YarnException;
-}
+  public abstract ApplicationPlacementContext getPlacementForApp(
+      ApplicationSubmissionContext asc, String user) throws YarnException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
index d617d16..9901f4a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.resourcemanager.placement;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType;
 
 import com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
+
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
 
 public class UserGroupMappingPlacementRule extends PlacementRule {
   private static final Log LOG = LogFactory
@@ -66,17 +77,41 @@ public class UserGroupMappingPlacementRule extends 
PlacementRule {
     MappingType type;
     String source;
     String queue;
+    String parentQueue;
+
+    public final static String DELIMITER = ":";
 
     public QueueMapping(MappingType type, String source, String queue) {
       this.type = type;
       this.source = source;
       this.queue = queue;
+      this.parentQueue = null;
     }
-    
+
+    public QueueMapping(MappingType type, String source,
+        String queue, String parentQueue) {
+      this.type = type;
+      this.source = source;
+      this.queue = queue;
+      this.parentQueue = parentQueue;
+    }
+
     public String getQueue() {
       return queue;
     }
-    
+
+    public String getParentQueue() {
+      return parentQueue;
+    }
+
+    public MappingType getType() {
+      return type;
+    }
+
+    public String getSource() {
+      return source;
+    }
+
     @Override
     public int hashCode() {
       return super.hashCode();
@@ -93,6 +128,13 @@ public class UserGroupMappingPlacementRule extends 
PlacementRule {
         return false;
       }
     }
+
+    public String toString() {
+      return type.toString() + DELIMITER + source + DELIMITER +
+        (parentQueue != null ?
+        parentQueue + "." + queue :
+        queue);
+    }
   }
 
   public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
@@ -102,26 +144,27 @@ public class UserGroupMappingPlacementRule extends 
PlacementRule {
     this.groups = groups;
   }
 
-  private String getMappedQueue(String user) throws IOException {
+  private ApplicationPlacementContext getPlacementForUser(String user)
+      throws IOException {
     for (QueueMapping mapping : mappings) {
       if (mapping.type == MappingType.USER) {
         if (mapping.source.equals(CURRENT_USER_MAPPING)) {
           if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
-            return user;
+            return getPlacementContext(mapping, user);
           } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
-            return groups.getGroups(user).get(0);
+            return getPlacementContext(mapping, groups.getGroups(user).get(0));
           } else {
-            return mapping.queue;
+            return getPlacementContext(mapping);
           }
         }
         if (user.equals(mapping.source)) {
-          return mapping.queue;
+          return getPlacementContext(mapping);
         }
       }
       if (mapping.type == MappingType.GROUP) {
         for (String userGroups : groups.getGroups(user)) {
           if (userGroups.equals(mapping.source)) {
-            return mapping.queue;
+            return getPlacementContext(mapping);
           }
         }
       }
@@ -130,13 +173,14 @@ public class UserGroupMappingPlacementRule extends 
PlacementRule {
   }
 
   @Override
-  public String getQueueForApp(ApplicationSubmissionContext asc, String user)
+  public ApplicationPlacementContext getPlacementForApp(
+      ApplicationSubmissionContext asc, String user)
       throws YarnException {
     String queueName = asc.getQueue();
     ApplicationId applicationId = asc.getApplicationId();
     if (mappings != null && mappings.size() > 0) {
       try {
-        String mappedQueue = getMappedQueue(user);
+        ApplicationPlacementContext mappedQueue = getPlacementForUser(user);
         if (mappedQueue != null) {
           // We have a mapping, should we use it?
           if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
@@ -153,10 +197,224 @@ public class UserGroupMappingPlacementRule extends 
PlacementRule {
         throw new YarnException(message);
       }
     }
-    
-    return queueName;
+    return null;
   }
-  
+
+  private ApplicationPlacementContext getPlacementContext(
+      QueueMapping mapping) {
+    return getPlacementContext(mapping, mapping.getQueue());
+  }
+
+  private ApplicationPlacementContext getPlacementContext(QueueMapping mapping,
+      String leafQueueName) {
+    if (!StringUtils.isEmpty(mapping.parentQueue)) {
+      return new ApplicationPlacementContext(leafQueueName,
+          mapping.getParentQueue());
+    } else{
+      return new ApplicationPlacementContext(leafQueueName);
+    }
+  }
+
+  @VisibleForTesting
+  public static UserGroupMappingPlacementRule get(
+      CapacitySchedulerContext schedulerContext) throws IOException {
+    CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
+    boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
+    LOG.info(
+        "Initialized queue mappings, override: " + overrideWithQueueMappings);
+
+    List<QueueMapping> queueMappings = conf.getQueueMappings();
+
+    // Get new user/group mappings
+    List<QueueMapping> newMappings = new ArrayList<>();
+
+    CapacitySchedulerQueueManager queueManager =
+        schedulerContext.getCapacitySchedulerQueueManager();
+
+    // check if mappings refer to valid queues
+    for (QueueMapping mapping : queueMappings) {
+
+      QueuePath queuePath = extractQueuePath(mapping.getQueue());
+      if (isStaticQueueMapping(mapping)) {
+        //Try getting queue by its leaf queue name
+        // without splitting into parent/leaf queues
+        CSQueue queue = queueManager.getQueue(mapping.getQueue());
+        if (ifQueueDoesNotExist(queue)) {
+          //Try getting the queue by extracting leaf and parent queue names
+          //Assuming its a potential auto created leaf queue
+          queue = queueManager.getQueue(queuePath.getLeafQueue());
+
+          if (ifQueueDoesNotExist(queue)) {
+            //if leaf queue does not exist,
+            // this could be a potential auto created leaf queue
+            //validate if parent queue is specified,
+            // then it should exist and
+            // be an instance of AutoCreateEnabledParentQueue
+            QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+                queueManager, mapping, queuePath);
+            if (newMapping == null) {
+              throw new IOException(
+                  "mapping contains invalid or non-leaf queue " + mapping
+                      .getQueue());
+            }
+            newMappings.add(newMapping);
+          } else{
+            QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
+                queue, mapping, queuePath);
+            newMappings.add(newMapping);
+          }
+        } else{
+          // if queue exists, validate
+          //   if its an instance of leaf queue
+          //   if its an instance of auto created leaf queue,
+          // then extract parent queue name and update queue mapping
+          QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
+              queue, mapping, queuePath);
+          newMappings.add(newMapping);
+        }
+      } else{
+        //If it is a dynamic queue mapping,
+        // we can safely assume leaf queue name does not have '.' in it
+        // validate
+        // if parent queue is specified, then
+        //  parent queue exists and an instance of AutoCreateEnabledParentQueue
+        //
+        QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+            queueManager, mapping, queuePath);
+        if (newMapping != null) {
+          newMappings.add(newMapping);
+        } else{
+          newMappings.add(mapping);
+        }
+      }
+    }
+
+    // initialize groups if mappings are present
+    if (newMappings.size() > 0) {
+      Groups groups = new Groups(conf);
+      return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
+          newMappings, groups);
+    }
+
+    return null;
+  }
+
+  private static QueueMapping validateAndGetQueueMapping(
+      CapacitySchedulerQueueManager queueManager, CSQueue queue,
+      QueueMapping mapping, QueuePath queuePath) throws IOException {
+    if (!(queue instanceof LeafQueue)) {
+      throw new IOException(
+          "mapping contains invalid or non-leaf queue : " + 
mapping.getQueue());
+    }
+
+    if (queue instanceof AutoCreatedLeafQueue && queue
+        .getParent() instanceof ManagedParentQueue) {
+
+      QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
+          queueManager, mapping, queuePath);
+      if (newMapping == null) {
+        throw new IOException(
+            "mapping contains invalid or non-leaf queue " + 
mapping.getQueue());
+      }
+      return newMapping;
+    }
+    return mapping;
+  }
+
+  private static boolean ifQueueDoesNotExist(CSQueue queue) {
+    return queue == null;
+  }
+
+  private static QueueMapping validateAndGetAutoCreatedQueueMapping(
+      CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
+      QueuePath queuePath) throws IOException {
+    if (queuePath.hasParentQueue()) {
+      //if parent queue is specified,
+      // then it should exist and be an instance of ManagedParentQueue
+      validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()),
+          queuePath.getParentQueue(), queuePath.getLeafQueue());
+      return new QueueMapping(mapping.getType(), mapping.getSource(),
+          queuePath.getLeafQueue(), queuePath.getParentQueue());
+    }
+
+    return null;
+  }
+
+  private static boolean isStaticQueueMapping(QueueMapping mapping) {
+    return !mapping.getQueue().contains(
+        UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping
+        .getQueue().contains(
+            UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING);
+  }
+
+  private static class QueuePath {
+
+    public String parentQueue;
+    public String leafQueue;
+
+    public QueuePath(final String leafQueue) {
+      this.leafQueue = leafQueue;
+    }
+
+    public QueuePath(final String parentQueue, final String leafQueue) {
+      this.parentQueue = parentQueue;
+      this.leafQueue = leafQueue;
+    }
+
+    public String getParentQueue() {
+      return parentQueue;
+    }
+
+    public String getLeafQueue() {
+      return leafQueue;
+    }
+
+    public boolean hasParentQueue() {
+      return parentQueue != null;
+    }
+
+    @Override
+    public String toString() {
+      return parentQueue + DOT + leafQueue;
+    }
+  }
+
+  private static QueuePath extractQueuePath(String queueName)
+      throws IOException {
+    int parentQueueNameEndIndex = queueName.lastIndexOf(DOT);
+
+    if (parentQueueNameEndIndex > -1) {
+      final String parentQueue = queueName.substring(0, 
parentQueueNameEndIndex)
+          .trim();
+      final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1)
+          .trim();
+      return new QueuePath(parentQueue, leafQueue);
+    }
+
+    return new QueuePath(queueName);
+  }
+
+  private static void validateParentQueue(CSQueue parentQueue,
+      String parentQueueName, String leafQueueName) throws IOException {
+    if (parentQueue == null) {
+      throw new IOException(
+          "mapping contains invalid or non-leaf queue [" + leafQueueName
+              + "] and invalid parent queue [" + parentQueueName + "]");
+    } else if (!(parentQueue instanceof ManagedParentQueue)) {
+      throw new IOException("mapping contains leaf queue [" + leafQueueName
+          + "] and invalid parent queue which "
+          + "does not have auto creation of leaf queues enabled ["
+          + parentQueueName + "]");
+    } else if (!parentQueue.getQueueName().equals(parentQueueName)) {
+      throw new IOException(
+          "mapping contains invalid or non-leaf queue [" + leafQueueName
+              + "] and invalid parent queue "
+              + "which does not match existing leaf queue's parent : ["
+              + parentQueueName + "] does not match [ " + parentQueue
+              .getQueueName() + "]");
+    }
+  }
+
   @VisibleForTesting
   public List<QueueMapping> getQueueMappings() {
     return mappings;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index ae5f6b4..85d355f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -68,6 +68,7 @@ import 
org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
@@ -83,6 +84,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -158,6 +161,8 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   private boolean isNumAttemptsBeyondThreshold = false;
 
+
+
   // Mutable fields
   private long startTime;
   private long finishTime = 0;
@@ -1073,38 +1078,51 @@ public class RMAppImpl implements RMApp, Recoverable {
                   app.getUser(),
                   BuilderUtils.parseTokensConf(app.submissionContext));
         } catch (Exception e) {
-          String msg = "Failed to fetch user credentials from application:"
-              + e.getMessage();
+          String msg = "Failed to fetch user credentials from application:" + e
+              .getMessage();
           app.diagnostics.append(msg);
           LOG.error(msg, e);
         }
       }
 
-      for (Map.Entry<ApplicationTimeoutType, Long> timeout :
-        app.applicationTimeouts.entrySet()) {
+      for (Map.Entry<ApplicationTimeoutType, Long> timeout : 
app.applicationTimeouts
+          .entrySet()) {
         app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId,
             timeout.getKey(), timeout.getValue());
         if (LOG.isDebugEnabled()) {
           long remainingTime = timeout.getValue() - app.systemClock.getTime();
           LOG.debug("Application " + app.applicationId
               + " is registered for timeout monitor, type=" + timeout.getKey()
-              + " remaining timeout="
-              + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds");
+              + " remaining timeout=" + (remainingTime > 0 ?
+              remainingTime / 1000 :
+              0) + " seconds");
         }
       }
 
+      ApplicationPlacementContext placementContext = null;
+      try {
+        placementContext = placeApplication(app.rmContext,
+            app.submissionContext, app.user);
+      } catch (Exception e) {
+        String msg = "Failed to place application to queue :" + e.getMessage();
+        app.diagnostics.append(msg);
+        LOG.error(msg, e);
+      }
+
       // No existent attempts means the attempt associated with this app was 
not
       // started or started but not yet saved.
       if (app.attempts.isEmpty()) {
-        app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
-            app.submissionContext, false, app.applicationPriority));
+        app.scheduler.handle(
+            new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
+                app.applicationPriority, placementContext));
         return RMAppState.SUBMITTED;
       }
 
       // Add application to scheduler synchronously to guarantee scheduler
       // knows applications before AM or NM re-registers.
-      app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
-          app.submissionContext, true, app.applicationPriority));
+      app.scheduler.handle(
+          new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
+              app.applicationPriority, placementContext));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -1120,8 +1138,20 @@ public class RMAppImpl implements RMApp, Recoverable {
       RMAppTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
-      app.handler.handle(new AppAddedSchedulerEvent(app.user,
-          app.submissionContext, false, app.applicationPriority));
+      ApplicationPlacementContext placementContext = null;
+      try {
+        placementContext = placeApplication(app.rmContext,
+            app.submissionContext, app.user);
+        replaceQueueFromPlacementContext(placementContext,
+            app.submissionContext);
+      } catch (YarnException e) {
+        String msg = "Failed to place application to queue :" + e.getMessage();
+        app.diagnostics.append(msg);
+        LOG.error(msg, e);
+      }
+      app.handler.handle(
+          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
+              app.applicationPriority, placementContext));
       // send the ATS create Event
       app.sendATSCreateEvent();
     }
@@ -2013,4 +2043,37 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.submissionContext.setAMContainerSpec(null);
     this.submissionContext.setLogAggregationContext(null);
   }
+
+  @VisibleForTesting
+  static ApplicationPlacementContext placeApplication(RMContext rmContext,
+      ApplicationSubmissionContext context, String user) throws YarnException {
+
+    ApplicationPlacementContext placementContext = null;
+    PlacementManager placementManager = rmContext.getQueuePlacementManager();
+
+    if (placementManager != null) {
+      placementContext = placementManager.placeApplication(context, user);
+    } else{
+      LOG.error(
+          "Queue Placement Manager is null. Cannot place application :" + " "
+              + context.getApplicationId() + " to queue ");
+    }
+
+    return placementContext;
+  }
+
+  static void replaceQueueFromPlacementContext(
+      ApplicationPlacementContext placementContext,
+      ApplicationSubmissionContext context) {
+    // Set it to ApplicationSubmissionContext
+    //apply queue mapping only to new application submissions
+    if (placementContext != null && !StringUtils.equals(context.getQueue(),
+        placementContext.getQueue())) {
+      LOG.info("Placed application=" + context.getApplicationId() + " to 
queue="
+          + placementContext.getQueue() + ", original queue=" + context
+          .getQueue());
+      context.setQueue(placementContext.getQueue());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 183cb36..74c85ce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -78,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   final String queueName;
   private final String queuePath;
   volatile int numContainers;
-  
+
   final Resource minimumAllocation;
   volatile Resource maximumAllocation;
   private volatile QueueState state = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
index b3d1b47..46f5cf1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -35,31 +35,13 @@ public abstract class AbstractManagedParentQueue extends 
ParentQueue {
   private static final Logger LOG = LoggerFactory.getLogger(
       AbstractManagedParentQueue.class);
 
-  private int maxAppsForAutoCreatedQueues;
-  private int maxAppsPerUserForAutoCreatedQueues;
-  private int userLimit;
-  private float userLimitFactor;
+  protected AutoCreatedLeafQueueTemplate leafQueueTemplate;
 
   public AbstractManagedParentQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
 
     super.setupQueueConfigs(csContext.getClusterResource());
-    initializeLeafQueueConfigs();
-
-    StringBuffer queueInfo = new StringBuffer();
-    queueInfo.append("Created Managed Parent Queue: ").append(queueName)
-        .append("\nof type : [" + getClass())
-        .append("]\nwith capacity: [")
-        .append(super.getCapacity()).append("]\nwith max capacity: [")
-        .append(super.getMaximumCapacity()).append("\nwith max apps: [")
-        .append(getMaxApplicationsForAutoCreatedQueues())
-        .append("]\nwith max apps per user: [")
-        .append(getMaxApplicationsPerUserForAutoCreatedQueues())
-        .append("]\nwith user limit: [").append(getUserLimit())
-        .append("]\nwith user limit factor: [")
-        .append(getUserLimitFactor()).append("].");
-    LOG.info(queueInfo.toString());
   }
 
   @Override
@@ -71,8 +53,6 @@ public abstract class AbstractManagedParentQueue extends 
ParentQueue {
       // Set new configs
       setupQueueConfigs(clusterResource);
 
-      initializeLeafQueueConfigs();
-
       // run reinitialize on each existing queue, to trigger absolute cap
       // recomputations
       for (CSQueue res : this.getChildQueues()) {
@@ -87,72 +67,29 @@ public abstract class AbstractManagedParentQueue extends 
ParentQueue {
    * Initialize leaf queue configs from template configurations specified on
    * parent queue.
    */
-  protected void initializeLeafQueueConfigs() {
+  protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
+    (String queuePath) {
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
 
-    final String queuePath = super.getQueuePath();
+    AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new
+        AutoCreatedLeafQueueTemplate.Builder();
     int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
     if (maxApps < 0) {
       maxApps = (int) (
           CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
               * getAbsoluteCapacity());
     }
-    userLimit = conf.getUserLimit(queuePath);
-    userLimitFactor = conf.getUserLimitFactor(queuePath);
-    maxAppsForAutoCreatedQueues = maxApps;
-    maxAppsPerUserForAutoCreatedQueues =
-        (int) (maxApps * (userLimit / 100.0f) * userLimitFactor);
-
-  }
-
-  /**
-   * Number of maximum applications for each of the auto created leaf queues.
-   *
-   * @return maxAppsForAutoCreatedQueues
-   */
-  public int getMaxApplicationsForAutoCreatedQueues() {
-    return maxAppsForAutoCreatedQueues;
-  }
-
-  /**
-   * Number of maximum applications per user for each of the auto created
-   * leaf queues.
-   *
-   * @return maxAppsPerUserForAutoCreatedQueues
-   */
-  public int getMaxApplicationsPerUserForAutoCreatedQueues() {
-    return maxAppsPerUserForAutoCreatedQueues;
-  }
-
-  /**
-   * User limit value for each of the  auto created leaf queues.
-   *
-   * @return userLimit
-   */
-  public int getUserLimitForAutoCreatedQueues() {
-    return userLimit;
-  }
-
-  /**
-   * User limit factor value for each of the  auto created leaf queues.
-   *
-   * @return userLimitFactor
-   */
-  public float getUserLimitFactor() {
-    return userLimitFactor;
-  }
 
-  public int getMaxAppsForAutoCreatedQueues() {
-    return maxAppsForAutoCreatedQueues;
-  }
+    int userLimit = conf.getUserLimit(queuePath);
+    float userLimitFactor = conf.getUserLimitFactor(queuePath);
+    leafQueueTemplateBuilder.userLimit(userLimit)
+          .userLimitFactor(userLimitFactor)
+          .maxApps(maxApps)
+          .maxAppsPerUser(
+              (int) (maxApps * (userLimit / 100.0f) * userLimitFactor));
 
-  public int getMaxAppsPerUserForAutoCreatedQueues() {
-    return maxAppsPerUserForAutoCreatedQueues;
-  }
-
-  public int getUserLimit() {
-    return userLimit;
+    return leafQueueTemplateBuilder;
   }
 
   /**
@@ -229,4 +166,111 @@ public abstract class AbstractManagedParentQueue extends 
ParentQueue {
     }
     return childQueue;
   }
+
+  protected float sumOfChildCapacities() {
+    try {
+      writeLock.lock();
+      float ret = 0;
+      for (CSQueue l : childQueues) {
+        ret += l.getCapacity();
+      }
+      return ret;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected float sumOfChildAbsCapacities() {
+    try {
+      writeLock.lock();
+      float ret = 0;
+      for (CSQueue l : childQueues) {
+        ret += l.getAbsoluteCapacity();
+      }
+      return ret;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public static class AutoCreatedLeafQueueTemplate {
+
+    private QueueCapacities queueCapacities;
+
+    private int maxApps;
+    private int maxAppsPerUser;
+    private int userLimit;
+    private float userLimitFactor;
+
+    AutoCreatedLeafQueueTemplate(Builder builder) {
+      this.maxApps = builder.maxApps;
+      this.maxAppsPerUser = builder.maxAppsPerUser;
+      this.userLimit = builder.userLimit;
+      this.userLimitFactor = builder.userLimitFactor;
+      this.queueCapacities = builder.queueCapacities;
+    }
+
+    public static class Builder {
+      private int maxApps;
+      private int maxAppsPerUser;
+
+      private int userLimit;
+      private float userLimitFactor;
+
+      private QueueCapacities queueCapacities;
+
+      Builder maxApps(int maxApplications) {
+        this.maxApps =  maxApplications;
+        return this;
+      }
+
+      Builder maxAppsPerUser(int maxApplicationsPerUser) {
+        this.maxAppsPerUser = maxApplicationsPerUser;
+        return this;
+      }
+
+      Builder userLimit(int usrLimit) {
+        this.userLimit = usrLimit;
+        return this;
+      }
+
+      Builder userLimitFactor(float ulf) {
+        this.userLimitFactor = ulf;
+        return this;
+      }
+
+      Builder capacities(QueueCapacities capacities) {
+        this.queueCapacities = capacities;
+        return this;
+      }
+
+      AutoCreatedLeafQueueTemplate build() {
+        return new AutoCreatedLeafQueueTemplate(this);
+      }
+    }
+
+    public int getUserLimit() {
+      return userLimit;
+    }
+
+    public float getUserLimitFactor() {
+      return userLimitFactor;
+    }
+
+    public QueueCapacities getQueueCapacities() {
+      return queueCapacities;
+    }
+
+    public int getMaxApps() {
+      return maxApps;
+    }
+
+    public int getMaxAppsPerUser() {
+      return maxAppsPerUser;
+    }
+  }
+
+  public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() {
+    return leafQueueTemplate;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 4eb7cdd..bc206d4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -21,6 +21,8 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,17 +44,18 @@ public class AutoCreatedLeafQueue extends LeafQueue {
       AbstractManagedParentQueue parent) throws IOException {
     super(cs, queueName, parent, null);
 
-    updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
-        parent.getUserLimitFactor(),
-        parent.getMaxApplicationsForAutoCreatedQueues(),
-        parent.getMaxApplicationsPerUserForAutoCreatedQueues());
-
+    AutoCreatedLeafQueueTemplate leafQueueTemplate =
+        parent.getLeafQueueTemplate();
+    updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
+        leafQueueTemplate.getUserLimitFactor(),
+        leafQueueTemplate.getMaxApps(),
+        leafQueueTemplate.getMaxAppsPerUser());
     this.parent = parent;
   }
 
   @Override
-  public void reinitialize(CSQueue newlyParsedQueue,
-      Resource clusterResource) throws IOException {
+  public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+      throws IOException {
     try {
       writeLock.lock();
 
@@ -62,10 +65,12 @@ public class AutoCreatedLeafQueue extends LeafQueue {
       CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
           this, labelManager, null);
 
-      updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(),
-          parent.getUserLimitFactor(),
-          parent.getMaxApplicationsForAutoCreatedQueues(),
-          parent.getMaxApplicationsPerUserForAutoCreatedQueues());
+      AutoCreatedLeafQueueTemplate leafQueueTemplate =
+          parent.getLeafQueueTemplate();
+      updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
+          leafQueueTemplate.getUserLimitFactor(),
+          leafQueueTemplate.getMaxApps(),
+          leafQueueTemplate.getMaxAppsPerUser());
 
     } finally {
       writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6f630f8..ed30ad1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -41,7 +41,6 @@ import 
org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -64,10 +63,10 @@ import 
org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
 import 
org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
-import 
org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import 
org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
@@ -146,6 +145,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 
+import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING;
+
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
@@ -560,44 +561,17 @@ public class CapacityScheduler extends
   }
 
   @VisibleForTesting
-  public UserGroupMappingPlacementRule
-      getUserGroupMappingPlacementRule() throws IOException {
+  public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
     try {
       readLock.lock();
-      boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
-      LOG.info(
-          "Initialized queue mappings, override: " + 
overrideWithQueueMappings);
-
-      // Get new user/group mappings
-      List<QueueMapping> newMappings = conf.getQueueMappings();
-      // check if mappings refer to valid queues
-      for (QueueMapping mapping : newMappings) {
-        String mappingQueue = mapping.getQueue();
-        if (!mappingQueue.equals(
-            UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && 
!mappingQueue
-            .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
-          CSQueue queue = getQueue(mappingQueue);
-          if (queue == null || !(queue instanceof LeafQueue)) {
-            throw new IOException(
-                "mapping contains invalid or non-leaf queue " + mappingQueue);
-          }
-        }
-      }
-
-      // initialize groups if mappings are present
-      if (newMappings.size() > 0) {
-        Groups groups = new Groups(conf);
-        return new UserGroupMappingPlacementRule(overrideWithQueueMappings,
-            newMappings, groups);
-      }
-
-      return null;
+      return UserGroupMappingPlacementRule.get(this);
     } finally {
       readLock.unlock();
     }
   }
 
-  private void updatePlacementRules() throws IOException {
+  @VisibleForTesting
+  void updatePlacementRules() throws IOException {
     // Initialize placement rules
     Collection<String> placementRuleStrs = conf.getStringCollection(
         YarnConfiguration.QUEUE_PLACEMENT_RULES);
@@ -731,37 +705,92 @@ public class CapacityScheduler extends
     }
   }
 
-  private void addApplication(ApplicationId applicationId,
-      String queueName, String user, Priority priority) {
+  private void addApplication(ApplicationId applicationId, String queueName,
+      String user, Priority priority,
+      ApplicationPlacementContext placementContext) {
     try {
       writeLock.lock();
       if (isSystemAppsLimitReached()) {
         String message = "Maximum system application limit reached,"
             + "cannot accept submission of application: " + applicationId;
-        this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(
-            applicationId, RMAppEventType.APP_REJECTED, message));
+        this.rmContext.getDispatcher().getEventHandler().handle(
+            new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                message));
         return;
       }
       // Sanity checks.
       CSQueue queue = getQueue(queueName);
+
+      if (queue == null && placementContext != null) {
+        //Could be a potential auto-created leaf queue
+        try {
+          queue = autoCreateLeafQueue(placementContext);
+        } catch (YarnException | IOException e) {
+          LOG.error("Could not auto-create leaf queue due to : ", e);
+          final String message =
+              "Application " + applicationId + " submission by user : " + user
+                  + " to  queue : " + queueName + " failed : " + 
e.getMessage();
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+        }
+      }
+
       if (queue == null) {
-        String message =
+        final String message =
             "Application " + applicationId + " submitted by user " + user
                 + " to unknown queue: " + queueName;
+
         this.rmContext.getDispatcher().getEventHandler().handle(
             new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
                 message));
         return;
       }
+
       if (!(queue instanceof LeafQueue)) {
         String message =
-            "Application " + applicationId + " submitted by user " + user
-                + " to non-leaf queue: " + queueName;
+            "Application " + applicationId + " submitted by user : " + user
+                + " to non-leaf queue : " + queueName;
         this.rmContext.getDispatcher().getEventHandler().handle(
             new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
                 message));
         return;
+      } else if (queue instanceof AutoCreatedLeafQueue && queue
+          .getParent() instanceof ManagedParentQueue) {
+
+        //If queue already exists and auto-queue creation was not required,
+        //placement context should not be null
+        if (placementContext == null) {
+          String message =
+              "Application " + applicationId + " submission by user : " + user
+                  + " to specified queue : " + queueName + "  is prohibited. "
+                  + "Verify automatic queue mapping for user exists in " +
+                  QUEUE_MAPPING;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return;
+          // For a queue which exists already and
+          // not auto-created above, then its parent queue should match
+          // the parent queue specified in queue mapping
+        } else if (!queue.getParent().getQueueName().equals(
+            placementContext.getParentQueue())) {
+          String message =
+              "Auto created Leaf queue " + placementContext.getQueue() + " "
+                  + "already exists under queue : " + queue
+                  .getParent().getQueuePath()
+                  + ".But Queue mapping configuration " +
+                   CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been "
+                  + "updated to a different parent queue : "
+                  + placementContext.getParentQueue()
+                  + " for the specified user : " + user;
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
+                  message));
+          return;
+        }
       }
+
       // Submit to the queue
       try {
         queue.submitApplication(applicationId, user, queueName);
@@ -1483,7 +1512,8 @@ public class CapacityScheduler extends
       if (queueName != null) {
         if (!appAddedEvent.getIsAppRecovering()) {
           addApplication(appAddedEvent.getApplicationId(), queueName,
-              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
+              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(),
+              appAddedEvent.getPlacementContext());
         } else {
           addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
               appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
@@ -2001,7 +2031,8 @@ public class CapacityScheduler extends
     try {
       writeLock.lock();
       LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
-      ParentQueue parent = (ParentQueue) queue.getParent();
+      AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue
+          .getParent();
 
       if (!(queue instanceof AutoCreatedLeafQueue)) {
         throw new SchedulerDynamicEditException(
@@ -2010,7 +2041,8 @@ public class CapacityScheduler extends
       }
 
       if (parent == null
-          || 
!(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) {
+          || !(AbstractManagedParentQueue.class.isAssignableFrom(
+              parent.getClass()))) {
         throw new SchedulerDynamicEditException(
             "The parent of AutoCreatedLeafQueue " + inQueue
                 + " must be a PlanQueue/ManagedParentQueue");
@@ -2655,4 +2687,43 @@ public class CapacityScheduler extends
     }
     return null;
   }
+
+  private LeafQueue autoCreateLeafQueue(
+      ApplicationPlacementContext placementContext)
+      throws IOException, YarnException {
+
+    AutoCreatedLeafQueue autoCreatedLeafQueue = null;
+
+    String leafQueueName = placementContext.getQueue();
+    String parentQueueName = placementContext.getParentQueue();
+
+    if (!StringUtils.isEmpty(parentQueueName)) {
+      CSQueue parentQueue = getQueue(parentQueueName);
+
+      if (parentQueue != null && conf.isAutoCreateChildQueueEnabled(
+          parentQueue.getQueuePath())) {
+
+        ManagedParentQueue autoCreateEnabledParentQueue =
+            (ManagedParentQueue) parentQueue;
+        autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName,
+            autoCreateEnabledParentQueue);
+
+        addQueue(autoCreatedLeafQueue);
+
+        //TODO - Set entitlement through capacity management policy
+      } else{
+        throw new SchedulerDynamicEditException(
+            "Could not auto-create leaf queue for " + leafQueueName
+                + ". Queue mapping specifies an invalid parent queue "
+                + "which does not exist "
+                + parentQueueName);
+      }
+    } else{
+      throw new SchedulerDynamicEditException(
+          "Could not auto-create leaf queue for " + leafQueueName
+              + ". Queue mapping does not specify"
+              + " which parent queue it needs to be created under.");
+    }
+    return autoCreatedLeafQueue;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index bfead35..4515453 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -907,6 +907,12 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
         DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
   }
 
+  @Private
+  @VisibleForTesting
+  public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
+    setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
+  }
+
   /**
    * Returns a collection of strings, trimming leading and trailing whitespeace
    * on each value
@@ -981,6 +987,31 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
     return mappings;
   }
 
+  @Private
+  @VisibleForTesting
+  public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
+    if (queuePlacementRules == null) {
+      return;
+    }
+    String str = StringUtils.join(",", queuePlacementRules);
+    setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str);
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setQueueMappings(List<QueueMapping> queueMappings) {
+    if (queueMappings == null) {
+      return;
+    }
+
+    List<String> queueMappingStrs = new ArrayList<>();
+    for (QueueMapping mapping : queueMappings) {
+      queueMappingStrs.add(mapping.toString());
+    }
+
+    setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
+  }
+
   public boolean isReservable(String queue) {
     boolean isReservable =
         getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false);
@@ -1523,4 +1554,126 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) {
     setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime);
   }
+
+  @Private
+  public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
+
+  @Private
+  public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
+      "auto-create-child-queue.enabled";
+
+  @Private
+  public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
+      "leaf-queue-template";
+
+  @Private
+  public static final String AUTO_CREATE_QUEUE_MAX_QUEUES =
+      "auto-create-child-queue.max-queues";
+
+  @Private
+  public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000;
+
+  /**
+   * If true, this queue will be created as a Parent Queue which Auto Created
+   * leaf child queues
+   *
+   * @param queuePath The queues path
+   * @return true if auto create is enabled for child queues else false. 
Default
+   * is false
+   */
+  @Private
+  public boolean isAutoCreateChildQueueEnabled(String queuePath) {
+    boolean isAutoCreateEnabled = getBoolean(
+        getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED,
+        DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED);
+    return isAutoCreateEnabled;
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setAutoCreateChildQueueEnabled(String queuePath,
+      boolean autoCreationEnabled) {
+    setBoolean(getQueuePrefix(queuePath) +
+            AUTO_CREATE_CHILD_QUEUE_ENABLED,
+        autoCreationEnabled);
+  }
+
+  /**
+   * Get the auto created leaf queue's template configuration prefix
+   * Leaf queue's template capacities are configured at the parent queue
+   *
+   * @param queuePath parent queue's path
+   * @return Config prefix for leaf queue template configurations
+   */
+  @Private
+  public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
+    return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
+  }
+
+  @Private
+  public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
+      "auto-create-child-queue.fail-on-exceeding-parent-capacity";
+
+  @Private
+  public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY 
=
+      false;
+
+  /**
+   * Fail further auto leaf queue creation when parent's guaranteed capacity is
+   * exceeded.
+   *
+   * @param queuePath the parent queue's path
+   * @return true if configured to fail else false
+   */
+  @Private
+  public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+      String queuePath) {
+    boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity =
+        getBoolean(getQueuePrefix(queuePath)
+                + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
+            DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY);
+    return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity;
+  }
+
+  @VisibleForTesting
+  @Private
+  public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+      String queuePath, boolean autoCreationEnabled) {
+    setBoolean(
+        getQueuePrefix(queuePath) +
+            FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY,
+        autoCreationEnabled);
+  }
+
+  /**
+   * Get the max number of leaf queues that are allowed to be created under
+   * a parent queue
+   *
+   * @param queuePath the paret queue's path
+   * @return the max number of leaf queues allowed to be auto created
+   */
+  @Private
+  public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) {
+    return getInt(getQueuePrefix(queuePath) +
+            AUTO_CREATE_QUEUE_MAX_QUEUES,
+        DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES);
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath,
+      float val) {
+    String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+        queuePath);
+    setCapacity(leafQueueConfPrefix, val);
+  }
+
+  @Private
+  @VisibleForTesting
+  public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
+      float val) {
+    String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+        queuePath);
+    setMaximumCapacity(leafQueueConfPrefix, val);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index 7be2529..eb50123 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -154,7 +154,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
    * @throws IOException if fails to initialize queues
    */
   public void initializeQueues(CapacitySchedulerConfiguration conf)
-      throws IOException {
+    throws IOException {
     root = parseQueue(this.csContext, conf, null,
         CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
     setQueueAcls(authorizer, appPriorityACLManager, queues);
@@ -176,7 +176,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
     if (!csContext.isConfigurationMutable() ||
         csContext.getRMContext().getHAServiceState()
             != HAServiceProtocol.HAServiceState.STANDBY) {
-      // Ensure queue hiearchy in the new XML file is proper.
+      // Ensure queue hierarchy in the new XML file is proper.
       validateQueueHierarchy(queues, newQueues);
     }
 
@@ -216,11 +216,13 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
       Map<String, CSQueue> oldQueues,
       QueueHook hook) throws IOException {
     CSQueue queue;
-    String fullQueueName =
-        (parent == null) ? queueName
-            : (parent.getQueuePath() + "." + queueName);
+    String fullQueueName = (parent == null) ?
+        queueName :
+        (parent.getQueuePath() + "." + queueName);
     String[] childQueueNames = conf.getQueues(fullQueueName);
     boolean isReservableQueue = conf.isReservable(fullQueueName);
+    boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(
+        fullQueueName);
     if (childQueueNames == null || childQueueNames.length == 0) {
       if (null == parent) {
         throw new IllegalStateException(
@@ -229,9 +231,8 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
       // Check if the queue will be dynamically managed by the Reservation
       // system
       if (isReservableQueue) {
-        queue =
-            new PlanQueue(csContext, queueName, parent,
-                oldQueues.get(queueName));
+        queue = new PlanQueue(csContext, queueName, parent,
+            oldQueues.get(queueName));
 
         //initializing the "internal" default queue, for SLS compatibility
         String defReservationId =
@@ -249,38 +250,46 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
         ((PlanQueue) queue).setChildQueues(childQueues);
         queues.put(defReservationId, resQueue);
 
-      } else {
-        queue =
-            new LeafQueue(csContext, queueName, parent,
-                oldQueues.get(queueName));
+      } else if (isAutoCreateEnabled) {
+        queue = new ManagedParentQueue(csContext, queueName, parent,
+            oldQueues.get(queueName));
 
+      } else{
+        queue = new LeafQueue(csContext, queueName, parent,
+            oldQueues.get(queueName));
         // Used only for unit tests
         queue = hook.hook(queue);
       }
-    } else {
+    } else{
       if (isReservableQueue) {
         throw new IllegalStateException(
             "Only Leaf Queues can be reservable for " + queueName);
       }
-      ParentQueue parentQueue =
-          new ParentQueue(csContext, queueName, parent,
-              oldQueues.get(queueName));
+
+      ParentQueue parentQueue;
+      if (isAutoCreateEnabled) {
+        parentQueue = new ManagedParentQueue(csContext, queueName, parent,
+            oldQueues.get(queueName));
+      } else{
+        parentQueue = new ParentQueue(csContext, queueName, parent,
+            oldQueues.get(queueName));
+      }
 
       // Used only for unit tests
       queue = hook.hook(parentQueue);
 
       List<CSQueue> childQueues = new ArrayList<>();
       for (String childQueueName : childQueueNames) {
-        CSQueue childQueue =
-            parseQueue(csContext, conf, queue, childQueueName,
-              queues, oldQueues, hook);
+        CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
+            queues, oldQueues, hook);
         childQueues.add(childQueue);
       }
       parentQueue.setChildQueues(childQueues);
+
     }
 
-    if (queue instanceof LeafQueue && queues.containsKey(queueName)
-        && queues.get(queueName) instanceof LeafQueue) {
+    if (queue instanceof LeafQueue && queues.containsKey(queueName) && queues
+        .get(queueName) instanceof LeafQueue) {
       throw new IOException("Two leaf queues were named " + queueName
           + ". Leaf queue names must be distinct");
     }
@@ -312,27 +321,46 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
           if (oldQueue.getState() == QueueState.STOPPED) {
             LOG.info("Deleting Queue " + queueName + ", as it is not"
                 + " present in the modified capacity configuration xml");
-          } else {
+          } else{
             throw new IOException(oldQueue.getQueuePath() + " is deleted from"
                 + " the new capacity scheduler configuration, but the"
-                + " queue is not yet in stopped state. "
-                + "Current State : " + oldQueue.getState());
+                + " queue is not yet in stopped state. " + "Current State : "
+                + oldQueue.getState());
           }
         } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
           //Queue's cannot be moved from one hierarchy to other
-          throw new IOException(queueName + " is moved from:"
-              + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
-              + " after refresh, which is not allowed.");
-        } else  if (oldQueue instanceof LeafQueue
+          throw new IOException(
+              queueName + " is moved from:" + oldQueue.getQueuePath() + " to:"
+                  + newQueue.getQueuePath()
+                  + " after refresh, which is not allowed.");
+        } else if (oldQueue instanceof ParentQueue
+            && !(oldQueue instanceof ManagedParentQueue)
+            && newQueue instanceof ManagedParentQueue) {
+          throw new IOException(
+              "Can not convert parent queue: " + oldQueue.getQueuePath()
+                  + " to auto create enabled parent queue since "
+                  + "it could have other pre-configured queues which is not "
+                  + "supported");
+        } else if (oldQueue instanceof ManagedParentQueue
+            && !(newQueue instanceof ManagedParentQueue)) {
+          throw new IOException(
+              "Cannot convert auto create enabled parent queue: " + oldQueue
+                  .getQueuePath() + " to leaf queue. Please check "
+                  + " parent queue's configuration "
+                  + CapacitySchedulerConfiguration
+                  .AUTO_CREATE_CHILD_QUEUE_ENABLED
+                  + " is set to true");
+        } else if (oldQueue instanceof LeafQueue
             && newQueue instanceof ParentQueue) {
           if (oldQueue.getState() == QueueState.STOPPED) {
             LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
                 + " to parent queue.");
-          } else {
-            throw new IOException("Can not convert the leaf queue: "
-                + oldQueue.getQueuePath() + " to parent queue since "
-                + "it is not yet in stopped state. Current State : "
-                + oldQueue.getState());
+          } else{
+            throw new IOException(
+                "Can not convert the leaf queue: " + oldQueue.getQueuePath()
+                    + " to parent queue since "
+                    + "it is not yet in stopped state. Current State : "
+                    + oldQueue.getState());
           }
         } else if (oldQueue instanceof ParentQueue
             && newQueue instanceof LeafQueue) {
@@ -352,6 +380,7 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
    */
   private void updateQueues(Map<String, CSQueue> existingQueues,
       Map<String, CSQueue> newQueues) {
+    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
       String queueName = e.getKey();
       CSQueue queue = e.getValue();
@@ -363,7 +392,13 @@ public class CapacitySchedulerQueueManager implements 
SchedulerQueueManager<
         .iterator(); itr.hasNext();) {
       Map.Entry<String, CSQueue> e = itr.next();
       String queueName = e.getKey();
-      if (!newQueues.containsKey(queueName)) {
+      CSQueue existingQueue = e.getValue();
+
+      //TODO - Handle case when auto create is disabled on parent queues
+      if (!newQueues.containsKey(queueName) && !(
+          existingQueue instanceof AutoCreatedLeafQueue && conf
+              .isAutoCreateChildQueueEnabled(
+                  existingQueue.getParent().getQueuePath()))) {
         itr.remove();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
new file mode 100644
index 0000000..ff795e4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .SchedulerDynamicEditException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Auto Creation enabled Parent queue. This queue initially does not have any
+ * children to start with and all child
+ * leaf queues will be auto created. Currently this does not allow other
+ * pre-configured leaf or parent queues to
+ * co-exist along with auto-created leaf queues. The auto creation is limited
+ * to leaf queues currently.
+ */
+public class ManagedParentQueue extends AbstractManagedParentQueue {
+
+  private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false;
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ManagedParentQueue.class);
+
+  public ManagedParentQueue(final CapacitySchedulerContext cs,
+      final String queueName, final CSQueue parent, final CSQueue old)
+      throws IOException {
+    super(cs, queueName, parent, old);
+    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+        csContext.getConfiguration());
+    this.leafQueueTemplate = initializeLeafQueueConfigs(
+        leafQueueTemplateConfPrefix).build();
+
+    StringBuffer queueInfo = new StringBuffer();
+    queueInfo.append("Created Managed Parent Queue: 
").append(queueName).append(
+        "]\nwith capacity: [").append(super.getCapacity()).append(
+        "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+        "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+        "]\nwith max apps per user: [").append(
+        leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+        .append(leafQueueTemplate.getUserLimit()).append(
+        "]\nwith user limit factor: [").append(
+        leafQueueTemplate.getUserLimitFactor()).append("].");
+    LOG.info(queueInfo.toString());
+  }
+
+  @Override
+  public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+      throws IOException {
+    validate(newlyParsedQueue);
+    super.reinitialize(newlyParsedQueue, clusterResource);
+    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+        csContext.getConfiguration());
+    this.leafQueueTemplate = initializeLeafQueueConfigs(
+        leafQueueTemplateConfPrefix).build();
+  }
+
+  @Override
+  protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs(
+      String queuePath) {
+
+    AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate =
+        super.initializeLeafQueueConfigs(queuePath);
+
+    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf);
+    QueueCapacities queueCapacities = new QueueCapacities(false);
+    CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix,
+        csContext.getConfiguration(), queueCapacities, getQueueCapacities());
+    leafQueueTemplate.capacities(queueCapacities);
+
+    shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
+        conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+            getQueuePath());
+
+    return leafQueueTemplate;
+  }
+
+  protected void validate(final CSQueue newlyParsedQueue) throws IOException {
+    // Sanity check
+    if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue
+        .getQueuePath().equals(getQueuePath())) {
+      throw new IOException(
+          "Trying to reinitialize " + getQueuePath() + " from "
+              + newlyParsedQueue.getQueuePath());
+    }
+  }
+
+  @Override
+  public void addChildQueue(CSQueue childQueue)
+      throws SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+
+      if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) 
{
+        throw new SchedulerDynamicEditException(
+            "Expected child queue to be an instance of AutoCreatedLeafQueue");
+      }
+
+      CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+      ManagedParentQueue parentQueue =
+          (ManagedParentQueue) childQueue.getParent();
+
+      String leafQueueName = childQueue.getQueueName();
+      int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
+          parentQueue.getQueuePath());
+
+      if (parentQueue.getChildQueues().size() >= maxQueues) {
+        throw new SchedulerDynamicEditException(
+            "Cannot auto create leaf queue " + leafQueueName + ".Max Child "
+                + "Queue limit exceeded which is configured as : " + maxQueues
+                + " and number of child queues is : " + parentQueue
+                .getChildQueues().size());
+      }
+
+      if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
+        if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity()
+            + parentQueue.sumOfChildAbsCapacities() > parentQueue
+            .getAbsoluteCapacity()) {
+          throw new SchedulerDynamicEditException(
+              "Cannot auto create leaf queue " + leafQueueName + ". Child "
+                  + "queues capacities have reached parent queue : "
+                  + parentQueue.getQueuePath() + " guaranteed capacity");
+        }
+      }
+
+      AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+      super.addChildQueue(leafQueue);
+      //TODO - refresh policy queue after capacity management is added
+
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) 
{
+    return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index d61951b..959ca51 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -1081,17 +1081,4 @@ public class ParentQueue extends AbstractCSQueue {
   public QueueOrderingPolicy getQueueOrderingPolicy() {
     return queueOrderingPolicy;
   }
-
-  protected float sumOfChildCapacities() {
-    try {
-      writeLock.lock();
-      float ret = 0;
-      for (CSQueue l : childQueues) {
-        ret += l.getCapacity();
-      }
-      return ret;
-    } finally {
-      writeLock.unlock();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0987a7b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 4ab2e9f..b7f8aa6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue {
   public PlanQueue(CapacitySchedulerContext cs, String queueName,
       CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
+    this.leafQueueTemplate = 
initializeLeafQueueConfigs(getQueuePath()).build();
+
+    StringBuffer queueInfo = new StringBuffer();
+    queueInfo.append("Created Plan Queue: ").append(queueName).append(
+        "]\nwith capacity: [").append(super.getCapacity()).append(
+        "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+        "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
+        "]\nwith max apps per user: [").append(
+        leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
+        .append(leafQueueTemplate.getUserLimit()).append(
+        "]\nwith user limit factor: [").append(
+        leafQueueTemplate.getUserLimitFactor()).append("].");
+    LOG.info(queueInfo.toString());
   }
 
   @Override
@@ -47,17 +60,21 @@ public class PlanQueue extends AbstractManagedParentQueue {
       throws IOException {
     validate(newlyParsedQueue);
     super.reinitialize(newlyParsedQueue, clusterResource);
+    this.leafQueueTemplate = 
initializeLeafQueueConfigs(getQueuePath()).build();
   }
 
   @Override
-  protected void initializeLeafQueueConfigs() {
-    String queuePath = super.getQueuePath();
+  protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
+      (String queuePath) {
+    AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super
+        .initializeLeafQueueConfigs
+        (queuePath);
     showReservationsAsQueues = csContext.getConfiguration()
         .getShowReservationAsQueues(queuePath);
-    super.initializeLeafQueueConfigs();
+    return leafQueueTemplate;
   }
 
-  private void validate(final CSQueue newlyParsedQueue) throws IOException {
+  protected void validate(final CSQueue newlyParsedQueue) throws IOException {
     // Sanity check
     if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
         .getQueuePath().equals(getQueuePath())) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to