YARN-4026. Refactored ContainerAllocator to accept a list of priorites rather 
than a single priority. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e5003be9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e5003be9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e5003be9

Branch: refs/heads/HDFS-7285
Commit: e5003be907acef87c2770e3f2914953f62017b0e
Parents: 1c12adb
Author: Jian He <jia...@apache.org>
Authored: Wed Aug 12 15:07:50 2015 -0700
Committer: Jian He <jia...@apache.org>
Committed: Wed Aug 12 15:07:50 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/capacity/LeafQueue.java           |   7 +-
 .../capacity/allocator/ContainerAllocation.java |  33 +++--
 .../capacity/allocator/ContainerAllocator.java  | 109 +++++++++-----
 .../allocator/RegularContainerAllocator.java    | 123 ++++++++++++----
 .../scheduler/common/fica/FiCaSchedulerApp.java | 103 +------------
 .../scheduler/capacity/TestLeafQueue.java       | 144 +++++++++++++++----
 7 files changed, 315 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 199a930..4c70a8a 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -393,6 +393,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via 
wangda)
 
+    YARN-4026. Refactored ContainerAllocator to accept a list of priorites
+    rather than a single priority. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5976f58..ff1baff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -763,8 +763,9 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerApp application =
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
-        CSAssignment assignment = application.assignReservedContainer(node, 
reservedContainer,
-            clusterResource, schedulingMode);
+        CSAssignment assignment =
+            application.assignContainers(clusterResource, node,
+                currentResourceLimits, schedulingMode, reservedContainer);
         handleExcessReservedContainer(clusterResource, assignment);
         return assignment;
       }
@@ -812,7 +813,7 @@ public class LeafQueue extends AbstractCSQueue {
       // Try to schedule
       CSAssignment assignment =
           application.assignContainers(clusterResource, node,
-              currentResourceLimits, schedulingMode);
+              currentResourceLimits, schedulingMode, null);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("post-assignContainers for application "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
index 00c1bb9..1df9410 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
@@ -25,18 +25,31 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class ContainerAllocation {
+  /**
+   * Skip the locality (e.g. node-local, rack-local, any), and look at other
+   * localities of the same priority
+   */
+  public static final ContainerAllocation LOCALITY_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
+
+  /**
+   * Skip the priority, and look at other priorities of the same application
+   */
   public static final ContainerAllocation PRIORITY_SKIPPED =
       new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
-  
+
+  /**
+   * Skip the application, and look at other applications of the same queue
+   */
   public static final ContainerAllocation APP_SKIPPED =
       new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
 
+  /**
+   * Skip the leaf-queue, and look at other queues of the same parent queue
+   */
   public static final ContainerAllocation QUEUE_SKIPPED =
       new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
-  
-  public static final ContainerAllocation LOCALITY_SKIPPED =
-      new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
-  
+
   RMContainer containerToBeUnreserved;
   private Resource resourceToBeAllocated = Resources.none();
   AllocationState state;
@@ -50,26 +63,26 @@ public class ContainerAllocation {
     this.resourceToBeAllocated = resourceToBeAllocated;
     this.state = state;
   }
-  
+
   public RMContainer getContainerToBeUnreserved() {
     return containerToBeUnreserved;
   }
-  
+
   public Resource getResourceToBeAllocated() {
     if (resourceToBeAllocated == null) {
       return Resources.none();
     }
     return resourceToBeAllocated;
   }
-  
+
   public AllocationState getAllocationState() {
     return state;
   }
-  
+
   public NodeType getContainerNodeType() {
     return containerNodeType;
   }
-  
+
   public Container getUpdatedContainer() {
     return updatedContainer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
index b4168dd..6e296cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
@@ -18,12 +18,15 @@
 
 package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
 
-import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -36,6 +39,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
  * extensible.
  */
 public abstract class ContainerAllocator {
+  private static final Log LOG = LogFactory.getLog(ContainerAllocator.class);
+
   FiCaSchedulerApp application;
   final ResourceCalculator rc;
   final RMContext rmContext;
@@ -46,27 +51,8 @@ public abstract class ContainerAllocator {
     this.rc = rc;
     this.rmContext = rmContext;
   }
-  
-  /**
-   * preAllocation is to perform checks, etc. to see if we can/cannot allocate
-   * container. It will put necessary information to returned
-   * {@link ContainerAllocation}. 
-   */
-  abstract ContainerAllocation preAllocation(
-      Resource clusterResource, FiCaSchedulerNode node,
-      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
-      Priority priority, RMContainer reservedContainer);
-  
-  /**
-   * doAllocation is to update application metrics, create containers, etc.
-   * According to allocating conclusion decided by preAllocation.
-   */
-  abstract ContainerAllocation doAllocation(
-      ContainerAllocation allocationResult, Resource clusterResource,
-      FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
-      RMContainer reservedContainer);
-  
-  boolean checkHeadroom(Resource clusterResource,
+
+  protected boolean checkHeadroom(Resource clusterResource,
       ResourceLimits currentResourceLimits, Resource required,
       FiCaSchedulerNode node) {
     // If headroom + currentReservation < required, we cannot allocate this
@@ -83,6 +69,68 @@ public abstract class ContainerAllocator {
         currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
         required);
   }
+
+  protected CSAssignment getCSAssignmentFromAllocateResult(
+      Resource clusterResource, ContainerAllocation result,
+      RMContainer rmContainer) {
+    // Handle skipped
+    boolean skipped =
+        (result.getAllocationState() == AllocationState.APP_SKIPPED);
+    CSAssignment assignment = new CSAssignment(skipped);
+    assignment.setApplication(application);
+    
+    // Handle excess reservation
+    assignment.setExcessReservation(result.getContainerToBeUnreserved());
+
+    // If we allocated something
+    if (Resources.greaterThan(rc, clusterResource,
+        result.getResourceToBeAllocated(), Resources.none())) {
+      Resource allocatedResource = result.getResourceToBeAllocated();
+      Container updatedContainer = result.getUpdatedContainer();
+
+      assignment.setResource(allocatedResource);
+      assignment.setType(result.getContainerNodeType());
+
+      if (result.getAllocationState() == AllocationState.RESERVED) {
+        // This is a reserved container
+        LOG.info("Reserved container " + " application="
+            + application.getApplicationId() + " resource=" + allocatedResource
+            + " queue=" + this.toString() + " cluster=" + clusterResource);
+        assignment.getAssignmentInformation().addReservationDetails(
+            updatedContainer.getId(),
+            application.getCSLeafQueue().getQueuePath());
+        assignment.getAssignmentInformation().incrReservations();
+        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
+            allocatedResource);
+      } else if (result.getAllocationState() == AllocationState.ALLOCATED){
+        // This is a new container
+        // Inform the ordering policy
+        LOG.info("assignedContainer" + " application attempt="
+            + application.getApplicationAttemptId() + " container="
+            + updatedContainer.getId() + " queue=" + this + " clusterResource="
+            + clusterResource);
+
+        application
+            .getCSLeafQueue()
+            .getOrderingPolicy()
+            .containerAllocated(application,
+                application.getRMContainer(updatedContainer.getId()));
+
+        assignment.getAssignmentInformation().addAllocationDetails(
+            updatedContainer.getId(),
+            application.getCSLeafQueue().getQueuePath());
+        assignment.getAssignmentInformation().incrAllocations();
+        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+            allocatedResource);
+        
+        if (rmContainer != null) {
+          assignment.setFulfilledReservation(true);
+        }
+      }
+    }
+    
+    return assignment;
+  }
   
   /**
    * allocate needs to handle following stuffs:
@@ -96,20 +144,7 @@ public abstract class ContainerAllocator {
    * container, this will also update metrics</li>
    * </ul>
    */
-  public ContainerAllocation allocate(Resource clusterResource,
+  public abstract CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
-      ResourceLimits resourceLimits, Priority priority,
-      RMContainer reservedContainer) {
-    ContainerAllocation result =
-        preAllocation(clusterResource, node, schedulingMode,
-            resourceLimits, priority, reservedContainer);
-    
-    if (AllocationState.ALLOCATED == result.state
-        || AllocationState.RESERVED == result.state) {
-      result = doAllocation(result, clusterResource, node,
-          schedulingMode, priority, reservedContainer);
-    }
-    
-    return result;
-  }
+      ResourceLimits resourceLimits, RMContainer reservedContainer);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index 6effcd3..dcb99ed 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -154,7 +155,6 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
     return null;
   }
 
-  @Override
   ContainerAllocation preAllocation(Resource clusterResource,
       FiCaSchedulerNode node, SchedulingMode schedulingMode,
       ResourceLimits resourceLimits, Priority priority,
@@ -295,14 +295,14 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
           schedulingMode, currentResoureLimits);
     }
 
-    return ContainerAllocation.QUEUE_SKIPPED;
+    return ContainerAllocation.APP_SKIPPED;
   }
 
   private ContainerAllocation assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
       SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
 
-    ContainerAllocation assigned;
+    ContainerAllocation allocation;
 
     NodeType requestType = null;
     // Data-local
@@ -310,14 +310,14 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
         application.getResourceRequest(priority, node.getNodeName());
     if (nodeLocalResourceRequest != null) {
       requestType = NodeType.NODE_LOCAL;
-      assigned =
+      allocation =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
               node, priority, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
-          assigned.getResourceToBeAllocated(), Resources.none())) {
-        assigned.requestNodeType = requestType;
-        return assigned;
+          allocation.getResourceToBeAllocated(), Resources.none())) {
+        allocation.requestNodeType = requestType;
+        return allocation;
       }
     }
 
@@ -333,14 +333,14 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
         requestType = NodeType.RACK_LOCAL;
       }
 
-      assigned =
+      allocation =
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
               node, priority, reservedContainer, schedulingMode,
               currentResoureLimits);
       if (Resources.greaterThan(rc, clusterResource,
-          assigned.getResourceToBeAllocated(), Resources.none())) {
-        assigned.requestNodeType = requestType;
-        return assigned;
+          allocation.getResourceToBeAllocated(), Resources.none())) {
+        allocation.requestNodeType = requestType;
+        return allocation;
       }
     }
 
@@ -356,13 +356,19 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
         requestType = NodeType.OFF_SWITCH;
       }
 
-      assigned =
+      allocation =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
               node, priority, reservedContainer, schedulingMode,
               currentResoureLimits);
-      assigned.requestNodeType = requestType;
+      allocation.requestNodeType = requestType;
+      
+      // When a returned allocation is LOCALITY_SKIPPED, since we're in
+      // off-switch request now, we will skip this app w.r.t priorities 
+      if (allocation.state == AllocationState.LOCALITY_SKIPPED) {
+        allocation.state = AllocationState.APP_SKIPPED;
+      }
 
-      return assigned;
+      return allocation;
     }
 
     return ContainerAllocation.PRIORITY_SKIPPED;
@@ -388,7 +394,7 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
       return new ContainerAllocation(rmContainer, null,
-          AllocationState.QUEUE_SKIPPED);
+          AllocationState.LOCALITY_SKIPPED);
     }
 
     Resource capability = request.getCapability();
@@ -400,7 +406,8 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for request : " + request
           + " node total capability : " + node.getTotalResource());
-      return ContainerAllocation.QUEUE_SKIPPED;
+      // Skip this locality request
+      return ContainerAllocation.LOCALITY_SKIPPED;
     }
 
     assert Resources.greaterThan(
@@ -457,7 +464,8 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
           // continue)). If we failed to unreserve some resource, we can't
           // continue.
           if (null == unreservedContainer) {
-            return ContainerAllocation.QUEUE_SKIPPED;
+            // Skip the locality request
+            return ContainerAllocation.LOCALITY_SKIPPED;
           }
         }
       }
@@ -468,19 +476,20 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
       result.containerNodeType = type;
       return result;
     } else {
-      // if we are allowed to allocate but this node doesn't have space, 
reserve it or
-      // if this was an already a reserved container, reserve it again
+      // if we are allowed to allocate but this node doesn't have space, 
reserve
+      // it or if this was an already a reserved container, reserve it again
       if (shouldAllocOrReserveNewContainer || rmContainer != null) {
 
         if (reservationsContinueLooking && rmContainer == null) {
           // we could possibly ignoring queue capacity or user limits when
-          // reservationsContinueLooking is set. Make sure we didn't need to 
unreserve
-          // one.
+          // reservationsContinueLooking is set. Make sure we didn't need to
+          // unreserve one.
           if (needToUnreserve) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("we needed to unreserve to be able to allocate");
             }
-            return ContainerAllocation.QUEUE_SKIPPED;
+            // Skip the locality request
+            return ContainerAllocation.LOCALITY_SKIPPED;          
           }
         }
 
@@ -490,7 +499,8 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
         result.containerNodeType = type;
         return result;
       }
-      return ContainerAllocation.QUEUE_SKIPPED;
+      // Skip the locality request
+      return ContainerAllocation.LOCALITY_SKIPPED;    
     }
   }
 
@@ -563,8 +573,7 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
       // Skip this app if we failed to allocate.
       ContainerAllocation ret =
           new ContainerAllocation(allocationResult.containerToBeUnreserved,
-              null, AllocationState.QUEUE_SKIPPED);
-      ret.state = AllocationState.APP_SKIPPED;
+              null, AllocationState.APP_SKIPPED);
       return ret;
     }
 
@@ -578,7 +587,6 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
     return allocationResult;    
   }
 
-  @Override
   ContainerAllocation doAllocation(ContainerAllocation allocationResult,
       Resource clusterResource, FiCaSchedulerNode node,
       SchedulingMode schedulingMode, Priority priority,
@@ -591,7 +599,7 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
     // something went wrong getting/creating the container
     if (container == null) {
       LOG.warn("Couldn't get container for allocation!");
-      return ContainerAllocation.QUEUE_SKIPPED;
+      return ContainerAllocation.APP_SKIPPED;
     }
 
     if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
@@ -626,4 +634,65 @@ public class RegularContainerAllocator extends 
ContainerAllocator {
 
     return allocationResult;
   }
+  
+  private ContainerAllocation allocate(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority,
+      RMContainer reservedContainer) {
+    ContainerAllocation result =
+        preAllocation(clusterResource, node, schedulingMode, resourceLimits,
+            priority, reservedContainer);
+
+    if (AllocationState.ALLOCATED == result.state
+        || AllocationState.RESERVED == result.state) {
+      result =
+          doAllocation(result, clusterResource, node, schedulingMode, priority,
+              reservedContainer);
+    }
+
+    return result;
+  }
+  
+  @Override
+  public CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits,
+      RMContainer reservedContainer) {
+    if (reservedContainer == null) {
+      // Check if application needs more resource, skip if it doesn't need 
more.
+      if (!application.hasPendingResourceRequest(rc,
+          node.getPartition(), clusterResource, schedulingMode)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+              + ", because it doesn't need more resource, schedulingMode="
+              + schedulingMode.name() + " node-label=" + node.getPartition());
+        }
+        return CSAssignment.SKIP_ASSIGNMENT;
+      }
+      
+      // Schedule in priority order
+      for (Priority priority : application.getPriorities()) {
+        ContainerAllocation result =
+            allocate(clusterResource, node, schedulingMode, resourceLimits,
+                priority, null);
+
+        AllocationState allocationState = result.getAllocationState();
+        if (allocationState == AllocationState.PRIORITY_SKIPPED) {
+          continue;
+        }
+        return getCSAssignmentFromAllocateResult(clusterResource, result,
+            null);
+      }
+
+      // We will reach here if we skipped all priorities of the app, so we will
+      // skip the app.
+      return CSAssignment.SKIP_ASSIGNMENT;
+    } else {
+      ContainerAllocation result =
+          allocate(clusterResource, node, schedulingMode, resourceLimits,
+              reservedContainer.getReservedPriority(), reservedContainer);
+      return getCSAssignmentFromAllocateResult(clusterResource, result,
+          reservedContainer);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f9a6bc2..74d77f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -57,10 +57,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
-import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -280,7 +278,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     return ret;
   }
 
-  public synchronized void addPreemptContainer(ContainerId cont){
+  public synchronized void addPreemptContainer(ContainerId cont) {
     // ignore already completed containers
     if (liveContainers.containsKey(cont)) {
       containersToPreempt.add(cont);
@@ -430,112 +428,19 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
   public LeafQueue getCSLeafQueue() {
     return (LeafQueue)queue;
   }
-
-  private CSAssignment getCSAssignmentFromAllocateResult(
-      Resource clusterResource, ContainerAllocation result) {
-    // Handle skipped
-    boolean skipped =
-        (result.getAllocationState() == AllocationState.APP_SKIPPED);
-    CSAssignment assignment = new CSAssignment(skipped);
-    assignment.setApplication(this);
-    
-    // Handle excess reservation
-    assignment.setExcessReservation(result.getContainerToBeUnreserved());
-
-    // If we allocated something
-    if (Resources.greaterThan(rc, clusterResource,
-        result.getResourceToBeAllocated(), Resources.none())) {
-      Resource allocatedResource = result.getResourceToBeAllocated();
-      Container updatedContainer = result.getUpdatedContainer();
-      
-      assignment.setResource(allocatedResource);
-      assignment.setType(result.getContainerNodeType());
-
-      if (result.getAllocationState() == AllocationState.RESERVED) {
-        // This is a reserved container
-        LOG.info("Reserved container " + " application=" + getApplicationId()
-            + " resource=" + allocatedResource + " queue="
-            + this.toString() + " cluster=" + clusterResource);
-        assignment.getAssignmentInformation().addReservationDetails(
-            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
-        assignment.getAssignmentInformation().incrReservations();
-        Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-            allocatedResource);
-        assignment.setFulfilledReservation(true);
-      } else {
-        // This is a new container
-        // Inform the ordering policy
-        LOG.info("assignedContainer" + " application attempt="
-            + getApplicationAttemptId() + " container="
-            + updatedContainer.getId() + " queue=" + this + " clusterResource="
-            + clusterResource);
-
-        getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
-            getRMContainer(updatedContainer.getId()));
-
-        assignment.getAssignmentInformation().addAllocationDetails(
-            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
-        assignment.getAssignmentInformation().incrAllocations();
-        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-            allocatedResource);
-      }
-    }
-    
-    return assignment;
-  }
   
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
-      SchedulingMode schedulingMode) {
+      SchedulingMode schedulingMode, RMContainer reservedContainer) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("pre-assignContainers for application "
           + getApplicationId());
       showRequests();
     }
 
-    // Check if application needs more resource, skip if it doesn't need more.
-    if (!hasPendingResourceRequest(rc,
-        node.getPartition(), clusterResource, schedulingMode)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip app_attempt=" + getApplicationAttemptId()
-            + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-label=" + node.getPartition());
-      }
-      return CSAssignment.SKIP_ASSIGNMENT;
-    }
-
     synchronized (this) {
-      // Schedule in priority order
-      for (Priority priority : getPriorities()) {
-        ContainerAllocation allocationResult =
-            containerAllocator.allocate(clusterResource, node,
-                schedulingMode, currentResourceLimits, priority, null);
-
-        // If it's a skipped allocation
-        AllocationState allocationState = 
allocationResult.getAllocationState();
-
-        if (allocationState == AllocationState.PRIORITY_SKIPPED) {
-          continue;
-        }
-        return getCSAssignmentFromAllocateResult(clusterResource,
-            allocationResult);
-      }
+      return containerAllocator.assignContainers(clusterResource, node,
+          schedulingMode, currentResourceLimits, reservedContainer);
     }
-
-    // We will reach here if we skipped all priorities of the app, so we will
-    // skip the app.
-    return CSAssignment.SKIP_ASSIGNMENT;
-  }
-
-
-  public synchronized CSAssignment assignReservedContainer(
-      FiCaSchedulerNode node, RMContainer rmContainer,
-      Resource clusterResource, SchedulingMode schedulingMode) {
-    ContainerAllocation result =
-        containerAllocator.allocate(clusterResource, node,
-            schedulingMode, new ResourceLimits(Resources.none()),
-            rmContainer.getReservedPriority(), rmContainer);
-
-    return getCSAssignmentFromAllocateResult(clusterResource, result);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e5003be9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 0efadc1..fe8be06 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -622,16 +622,9 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
-        new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
-    a.submitApplicationAttempt(app_1, user_0);  // same user
-
-    final ApplicationAttemptId appAttemptId_2 = 
-        TestUtils.getMockApplicationAttemptId(2, 0); 
-    FiCaSchedulerApp app_2 = 
-        new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
+        new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
             a.getActiveUsersManager(), spyRMContext);
-    a.submitApplicationAttempt(app_2, user_1);
+    a.submitApplicationAttempt(app_1, user_1); // different user
 
     // Setup some nodes
     String host_0 = "127.0.0.1";
@@ -647,7 +640,7 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
                 priority, recordFactory)));
 
     app_1.updateResourceRequests(Collections.singletonList(
@@ -662,39 +655,38 @@ public class TestLeafQueue {
     a.setUserLimit(50);
     a.setUserLimitFactor(2);
     
-    // Now, only user_0 should be active since he is the only one with
-    // outstanding requests
-    assertEquals("There should only be 1 active user!", 
-        1, a.getActiveUsersManager().getNumActiveUsers());
-
-    // This commented code is key to test 'activeUsers'. 
-    // It should fail the test if uncommented since
-    // it would increase 'activeUsers' to 2 and stop user_2
-    // Pre MAPREDUCE-3732 this test should fail without this block too
-//    app_2.updateResourceRequests(Collections.singletonList(
-//        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
-//            recordFactory)));
+    // There're two active users
+    assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
     a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    assertEquals(2*GB, a.getUsedResources().getMemory());
-    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
-    // Again one to user_0 since he hasn't exceeded user limit yet
+    // Allocate one container to app_1. Even if app_0
+    // submit earlier, it cannot get this container assigned since user_0
+    // exceeded user-limit already. 
     a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    assertEquals(3*GB, a.getUsedResources().getMemory());
-    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(4*GB, a.getUsedResources().getMemory());
+    assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
-    // One more to user_0 since he is the only active user
+    // Allocate one container to app_0, before allocating this container,
+    // user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
+    // user-limit.
     a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    assertEquals(4*GB, a.getUsedResources().getMemory());
-    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
-    assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(7*GB, a.getUsedResources().getMemory());
+    assertEquals(6*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+
+    // app_0 doesn't have outstanding resources, there's only one active user.
+    assertEquals("There should only be 1 active user!", 
+        1, a.getActiveUsersManager().getNumActiveUsers());
+
   }
 
   @Test
@@ -2569,6 +2561,96 @@ public class TestLeafQueue {
     Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
 
   }
+  
+  @Test
+  public void testLocalityDelaySkipsApplication() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // User
+    String user_0 = "user_0";
+    
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
+    a.submitApplicationAttempt(app_0, user_0);
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
+    a.submitApplicationAttempt(app_1, user_0);
+
+    // Setup some nodes and racks
+    String host_0 = "127.0.0.1";
+    String rack_0 = "rack_0";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB);
+    
+    String host_1 = "127.0.0.2";
+    String rack_1 = "rack_1";
+    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB);
+    
+    String host_2 = "127.0.0.3";
+    String rack_2 = "rack_2";
+    FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB);
+
+    final int numNodes = 3;
+    Resource clusterResource = 
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    
+    // Setup resource-requests and submit
+    // App0 has node local request for host_0/host_1, and app1 has node local
+    // request for host2.
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one 
extra
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+
+    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(host_2, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(rack_2, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one 
extra
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+
+    // Start testing...
+    // When doing allocation, even if app_0 submit earlier than app_1, app_1 
can
+    // still get allocated because app_0 is waiting for node-locality-delay
+    CSAssignment assignment = null;
+    
+    // Check app_0's scheduling opportunities increased and app_1 get allocated
+    assignment = a.assignContainers(clusterResource, node_2,
+        new ResourceLimits(clusterResource), 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
+    assertEquals(1, app_0.getSchedulingOpportunities(priority));
+    assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(0, app_0.getLiveContainers().size());
+    assertEquals(1, app_1.getLiveContainers().size());
+  }
 
   private List<FiCaSchedulerApp> createListOfApps(int noOfApps, String user,
       LeafQueue defaultQueue) {

Reply via email to