[10/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-11 Thread aengineer
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index a69af6e..fd0c68b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -47,8 +47,13 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.IOException;
@@ -71,12 +76,10 @@ public class ParentQueue extends AbstractCSQueue {
 
   protected final Set childQueues;  
   private final boolean rootQueue;
-  final Comparator nonPartitionedQueueComparator;
-  final PartitionedQueueComparator partitionQueueComparator;
-  volatile int numApplications;
+  private final Comparator nonPartitionedQueueComparator;
+  private final PartitionedQueueComparator partitionQueueComparator;
+  private volatile int numApplications;
   private final CapacitySchedulerContext scheduler;
-  private boolean needToResortQueuesAtNextAllocation = false;
-  private int offswitchPerHeartbeatLimit;
 
   private final RecordFactory recordFactory = 
 RecordFactoryProvider.getRecordFactory(null);
@@ -86,7 +89,7 @@ public class ParentQueue extends AbstractCSQueue {
 super(cs, queueName, parent, old);
 this.scheduler = cs;
 this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
-this.partitionQueueComparator = cs.getPartitionedQueueComparator();
+this.partitionQueueComparator = new PartitionedQueueComparator();
 
 this.rootQueue = (parent == null);
 
@@ -126,16 +129,12 @@ public class ParentQueue extends AbstractCSQueue {
 }
   }
 
-  offswitchPerHeartbeatLimit =
-csContext.getConfiguration().getOffSwitchPerHeartbeatLimit();
-
   LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
   + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
   + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
   + ", absoluteMaxCapacity=" + this.queueCapacities
   .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
   + aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
-  + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit()
   + ", reservationsContinueLooking=" + reservationsContinueLooking);
 } finally {
   writeLock.unlock();
@@ -215,11 +214,6 @@ public class ParentQueue extends AbstractCSQueue {
 
   }
 
-  @Private
-  public int getOffSwitchPerHeartbeatLimit() {
-return offswitchPerHeartbeatLimit;
-  }
-
   private QueueUserACLInfo getUserAclInfo(
   UserGroupInformation user) {
 try {
@@ -435,156 +429,145 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-  FiCaSchedulerNode node, ResourceLimits resourceLimits,
-  SchedulingMode schedulingMode) {
-int offswitchCount = 0;
-try {
-  writeLock.lock();
-  // if our queue cannot access this node, just return
-  if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-  && 

[10/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-10 Thread kasha
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 51b567b..8694efb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -78,6 +80,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
 
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -196,6 +199,7 @@ public class TestLeafQueue {
 
 cs.setRMContext(spyRMContext);
 cs.init(csConf);
+cs.setResourceCalculator(rC);
 cs.start();
 
 when(spyRMContext.getScheduler()).thenReturn(cs);
@@ -268,6 +272,12 @@ public class TestLeafQueue {
 any(Resource.class), any(FiCaSchedulerApp.class), 
any(FiCaSchedulerNode.class), 
 any(RMContainer.class), any(ContainerStatus.class), 
 any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
+
+// Stub out parent queue's accept and apply.
+doReturn(true).when(parent).accept(any(Resource.class),
+any(ResourceCommitRequest.class));
+doNothing().when(parent).apply(any(Resource.class),
+any(ResourceCommitRequest.class));
 
 return queue;
   }
@@ -339,6 +349,12 @@ public class TestLeafQueue {
 FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
 8*GB);
 
+Map apps = ImmutableMap.of(
+app_0.getApplicationAttemptId(), app_0, 
app_1.getApplicationAttemptId(),
+app_1);
+Map nodes = ImmutableMap.of(node_0.getNodeID(),
+node_0);
+
 final int numNodes = 1;
 Resource clusterResource = 
 Resources.createResource(numNodes * (8*GB), numNodes * 16);
@@ -353,8 +369,10 @@ public class TestLeafQueue {
 // Start testing...
 
 // Only 1 container
-a.assignContainers(clusterResource, node_0, new ResourceLimits(
-clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+applyCSAssignment(clusterResource,
+a.assignContainers(clusterResource, node_0,
+new ResourceLimits(clusterResource),
+SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
 assertEquals(
 (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - 
(1*GB),
 a.getMetrics().getAvailableMB());
@@ -526,6 +544,12 @@ public class TestLeafQueue {
 FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
 8*GB);
 
+Map apps = ImmutableMap.of(
+app_0.getApplicationAttemptId(), app_0, 
app_1.getApplicationAttemptId(),
+app_1);
+Map nodes = ImmutableMap.of(node_0.getNodeID(),
+node_0);
+
 final int numNodes =