[10/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a69af6e..fd0c68b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -47,8 +47,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -71,12 +76,10 @@ public class ParentQueue extends AbstractCSQueue { protected final Set childQueues; private final boolean rootQueue; - final Comparator nonPartitionedQueueComparator; - final PartitionedQueueComparator partitionQueueComparator; - volatile int numApplications; + private final Comparator nonPartitionedQueueComparator; + private final PartitionedQueueComparator partitionQueueComparator; + private volatile int numApplications; private final CapacitySchedulerContext scheduler; - private boolean needToResortQueuesAtNextAllocation = false; - private int offswitchPerHeartbeatLimit; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -86,7 +89,7 @@ public class ParentQueue extends AbstractCSQueue { super(cs, queueName, parent, old); this.scheduler = cs; this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); -this.partitionQueueComparator = cs.getPartitionedQueueComparator(); +this.partitionQueueComparator = new PartitionedQueueComparator(); this.rootQueue = (parent == null); @@ -126,16 +129,12 @@ public class ParentQueue extends AbstractCSQueue { } } - offswitchPerHeartbeatLimit = -csContext.getConfiguration().getOffSwitchPerHeartbeatLimit(); - LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + ", absoluteMaxCapacity=" + this.queueCapacities .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" - + ", offswitchPerHeartbeatLimit = " + getOffSwitchPerHeartbeatLimit() + ", reservationsContinueLooking=" + reservationsContinueLooking); } finally { writeLock.unlock(); @@ -215,11 +214,6 @@ public class ParentQueue extends AbstractCSQueue { } - @Private - public int getOffSwitchPerHeartbeatLimit() { -return offswitchPerHeartbeatLimit; - } - private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { try { @@ -435,156 +429,145 @@ public class ParentQueue extends AbstractCSQueue { @Override public CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits, - SchedulingMode schedulingMode) { -int offswitchCount = 0; -try { - writeLock.lock(); - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - &&
[10/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 51b567b..8694efb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -49,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -78,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -196,6 +199,7 @@ public class TestLeafQueue { cs.setRMContext(spyRMContext); cs.init(csConf); +cs.setResourceCalculator(rC); cs.start(); when(spyRMContext.getScheduler()).thenReturn(cs); @@ -268,6 +272,12 @@ public class TestLeafQueue { any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + +// Stub out parent queue's accept and apply. +doReturn(true).when(parent).accept(any(Resource.class), +any(ResourceCommitRequest.class)); +doNothing().when(parent).apply(any(Resource.class), +any(ResourceCommitRequest.class)); return queue; } @@ -339,6 +349,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); +Mapapps = ImmutableMap.of( +app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), +app_1); +Map nodes = ImmutableMap.of(node_0.getNodeID(), +node_0); + final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); @@ -353,8 +369,10 @@ public class TestLeafQueue { // Start testing... // Only 1 container -a.assignContainers(clusterResource, node_0, new ResourceLimits( -clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); +applyCSAssignment(clusterResource, +a.assignContainers(clusterResource, node_0, +new ResourceLimits(clusterResource), +SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); assertEquals( (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -526,6 +544,12 @@ public class TestLeafQueue { FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); +Map apps = ImmutableMap.of( +app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), +app_1); +Map nodes = ImmutableMap.of(node_0.getNodeID(), +node_0); + final int numNodes =