[09/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java new file mode 100644 index 000..8b4907b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/SchedulerContainer.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; + +/** + * Contexts for a container inside scheduler + */ +public class SchedulerContainer { + private RMContainer rmContainer; + private String nodePartition; + private A schedulerApplicationAttempt; + private N schedulerNode; + private boolean allocated; // Allocated (True) or reserved (False) + + public SchedulerContainer(A app, N node, RMContainer rmContainer, + String nodePartition, boolean allocated) { +this.schedulerApplicationAttempt = app; +this.schedulerNode = node; +this.rmContainer = rmContainer; +this.nodePartition = nodePartition; +this.allocated = allocated; + } + + public String getNodePartition() { +return nodePartition; + } + + public RMContainer getRmContainer() { +return rmContainer; + } + + public A getSchedulerApplicationAttempt() { +return schedulerApplicationAttempt; + } + + public N getSchedulerNode() { +return schedulerNode; + } + + public boolean isAllocated() { +return allocated; + } + + public SchedulerRequestKey getSchedulerRequestKey() { +if (rmContainer.getState() == RMContainerState.RESERVED) { + return rmContainer.getReservedSchedulerKey(); +} +return rmContainer.getAllocatedSchedulerKey(); + } + + @Override + public String toString() { +return "(Application=" + schedulerApplicationAttempt +.getApplicationAttemptId() + "; Node=" + schedulerNode.getNodeID() ++ "; Resource=" + rmContainer.getAllocatedOrReservedResource() + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index ebe70d4..6d9dda8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,14 +18,7 @@ package
[09/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java -- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 42a8872..d875969 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -49,8 +49,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -123,6 +127,27 @@ public class TestParentQueue { return application; } + private void applyAllocationToQueue(Resource clusterResource, + int allocatedMem, + CSQueue queue) { +// Call accept & apply for queue +ResourceCommitRequest request = mock(ResourceCommitRequest.class); +when(request.anythingAllocatedOrReserved()).thenReturn(true); +ContainerAllocationProposal allocation = mock( +ContainerAllocationProposal.class); +when(request.getTotalReleasedResource()).thenReturn(Resources.none()); + when(request.getFirstAllocatedOrReservedContainer()).thenReturn(allocation); +SchedulerContainer scontainer = mock(SchedulerContainer.class); +when(allocation.getAllocatedOrReservedContainer()).thenReturn(scontainer); +when(allocation.getAllocatedOrReservedResource()).thenReturn( +Resources.createResource(allocatedMem)); +when(scontainer.getNodePartition()).thenReturn(""); + +if (queue.accept(clusterResource, request)) { + queue.apply(clusterResource, request); +} + } + private void stubQueueAllocation(final CSQueue queue, final Resource clusterResource, final FiCaSchedulerNode node, final int allocation) { @@ -157,7 +182,7 @@ public class TestParentQueue { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), eq(node), + .assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability @@ -168,7 +193,7 @@ public class TestParentQueue { return new CSAssignment(allocatedResource, type); } -}).when(queue).assignContainers(eq(clusterResource), eq(node), +}).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); } @@ -205,8 +230,8 @@ public class TestParentQueue { setupSingleLevelQueues(csConf); Mapqueues = new HashMap (); -CSQueue root = -CapacityScheduler.parseQueue(csContext, csConf, null, +CSQueue root = +CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -245,13 +270,18 @@ public class TestParentQueue { // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); -root.assignContainers(clusterResource,