[09/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-11 Thread aengineer
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
new file mode 100644
index 000..8b4907b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
+
+/**
+ * Contexts for a container inside scheduler
+ */
+public class SchedulerContainer {
+  private RMContainer rmContainer;
+  private String nodePartition;
+  private A schedulerApplicationAttempt;
+  private N schedulerNode;
+  private boolean allocated; // Allocated (True) or reserved (False)
+
+  public SchedulerContainer(A app, N node, RMContainer rmContainer,
+  String nodePartition, boolean allocated) {
+this.schedulerApplicationAttempt = app;
+this.schedulerNode = node;
+this.rmContainer = rmContainer;
+this.nodePartition = nodePartition;
+this.allocated = allocated;
+  }
+
+  public String getNodePartition() {
+return nodePartition;
+  }
+
+  public RMContainer getRmContainer() {
+return rmContainer;
+  }
+
+  public A getSchedulerApplicationAttempt() {
+return schedulerApplicationAttempt;
+  }
+
+  public N getSchedulerNode() {
+return schedulerNode;
+  }
+
+  public boolean isAllocated() {
+return allocated;
+  }
+
+  public SchedulerRequestKey getSchedulerRequestKey() {
+if (rmContainer.getState() == RMContainerState.RESERVED) {
+  return rmContainer.getReservedSchedulerKey();
+}
+return rmContainer.getAllocatedSchedulerKey();
+  }
+
+  @Override
+  public String toString() {
+return "(Application=" + schedulerApplicationAttempt
+.getApplicationAttemptId() + "; Node=" + schedulerNode.getNodeID()
++ "; Resource=" + rmContainer.getAllocatedOrReservedResource() + ")";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index ebe70d4..6d9dda8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -18,14 +18,7 @@
 
 package 

[09/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan

2016-11-10 Thread kasha
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
--
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 42a8872..d875969 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -49,8 +49,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -123,6 +127,27 @@ public class TestParentQueue {
 return application;
   }
 
+  private void applyAllocationToQueue(Resource clusterResource,
+  int allocatedMem,
+  CSQueue queue) {
+// Call accept & apply for queue
+ResourceCommitRequest request = mock(ResourceCommitRequest.class);
+when(request.anythingAllocatedOrReserved()).thenReturn(true);
+ContainerAllocationProposal allocation = mock(
+ContainerAllocationProposal.class);
+when(request.getTotalReleasedResource()).thenReturn(Resources.none());
+
when(request.getFirstAllocatedOrReservedContainer()).thenReturn(allocation);
+SchedulerContainer scontainer = mock(SchedulerContainer.class);
+when(allocation.getAllocatedOrReservedContainer()).thenReturn(scontainer);
+when(allocation.getAllocatedOrReservedResource()).thenReturn(
+Resources.createResource(allocatedMem));
+when(scontainer.getNodePartition()).thenReturn("");
+
+if (queue.accept(clusterResource, request)) {
+  queue.apply(clusterResource, request);
+}
+  }
+
   private void stubQueueAllocation(final CSQueue queue, 
   final Resource clusterResource, final FiCaSchedulerNode node, 
   final int allocation) {
@@ -157,7 +182,7 @@ public class TestParentQueue {
 // Next call - nothing
 if (allocation > 0) {
   doReturn(new CSAssignment(Resources.none(), type)).when(queue)
-  .assignContainers(eq(clusterResource), eq(node),
+  .assignContainers(eq(clusterResource), any(PlacementSet.class),
   any(ResourceLimits.class), any(SchedulingMode.class));
 
   // Mock the node's resource availability
@@ -168,7 +193,7 @@ public class TestParentQueue {
 
 return new CSAssignment(allocatedResource, type);
   }
-}).when(queue).assignContainers(eq(clusterResource), eq(node),
+}).when(queue).assignContainers(eq(clusterResource), 
any(PlacementSet.class),
 any(ResourceLimits.class), any(SchedulingMode.class));
   }
   
@@ -205,8 +230,8 @@ public class TestParentQueue {
 setupSingleLevelQueues(csConf);
 
 Map queues = new HashMap();
-CSQueue root = 
-CapacityScheduler.parseQueue(csContext, csConf, null, 
+CSQueue root =
+CapacityScheduler.parseQueue(csContext, csConf, null,
 CapacitySchedulerConfiguration.ROOT, queues, queues, 
 TestUtils.spyHook);
 
@@ -245,13 +270,18 @@ public class TestParentQueue {
 // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
 stubQueueAllocation(a, clusterResource, node_1, 2*GB);
 stubQueueAllocation(b, clusterResource, node_1, 1*GB);
-root.assignContainers(clusterResource,