http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 882498a..eb64d43 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -56,8 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -377,16 +375,29 @@ public class ParentQueue extends AbstractCSQueue { @Override public synchronized CSAssignment assignContainers(Resource clusterResource, - FiCaSchedulerNode node, ResourceLimits resourceLimits) { - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - Set<String> nodeLabels = node.getLabels(); - + FiCaSchedulerNode node, ResourceLimits resourceLimits, + SchedulingMode schedulingMode) { // if our queue cannot access this node, just return - if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) { - return assignment; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + return NULL_ASSIGNMENT; + } + + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!super.hasPendingResourceRequest(node.getPartition(), + clusterResource, schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node.getPartition()); + } + return NULL_ASSIGNMENT; } + CSAssignment assignment = + new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); + while (canAssign(clusterResource, node)) { if (LOG.isDebugEnabled()) { LOG.debug("Trying to assign containers to child-queue of " @@ -396,15 +407,17 @@ public class ParentQueue extends AbstractCSQueue { // Are we over maximum-capacity for this queue? // This will also consider parent's limits and also continuous reservation // looking - if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits, - minimumAllocation, Resources.createResource(getMetrics() - .getReservedMB(), getMetrics().getReservedVirtualCores()))) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + resourceLimits, minimumAllocation, Resources.createResource( + getMetrics().getReservedMB(), getMetrics() + .getReservedVirtualCores()), schedulingMode)) { break; } // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits); + CSAssignment assignedToChild = + assignContainersToChildQueues(clusterResource, node, resourceLimits, + schedulingMode); assignment.setType(assignedToChild.getType()); // Done if no child-queue assigned anything @@ -413,7 +426,7 @@ public class ParentQueue extends AbstractCSQueue { assignedToChild.getResource(), Resources.none())) { // Track resource utilization for the parent-queue super.allocateResource(clusterResource, assignedToChild.getResource(), - nodeLabels); + node.getPartition()); // Track resource utilization in this pass of the scheduler Resources @@ -510,7 +523,8 @@ public class ParentQueue extends AbstractCSQueue { } private synchronized CSAssignment assignContainersToChildQueues( - Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) { + Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, + SchedulingMode schedulingMode) { CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); @@ -523,12 +537,13 @@ public class ParentQueue extends AbstractCSQueue { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() + " stats: " + childQueue); } - + // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, cluster, limits); - assignment = childQueue.assignContainers(cluster, node, childLimits); + assignment = childQueue.assignContainers(cluster, node, + childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + " stats: " + childQueue + " --> " + @@ -584,7 +599,7 @@ public class ParentQueue extends AbstractCSQueue { // Book keeping synchronized (this) { super.releaseResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); LOG.info("completedContainer" + " queue=" + getQueueName() + @@ -653,7 +668,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); } if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); @@ -681,7 +696,7 @@ public class ParentQueue extends AbstractCSQueue { FiCaSchedulerNode node = scheduler.getNode(rmContainer.getContainer().getNodeId()); super.allocateResource(clusterResource, rmContainer.getContainer() - .getResource(), node.getLabels()); + .getResource(), node.getPartition()); LOG.info("movedContainer" + " queueMoveIn=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" @@ -701,7 +716,7 @@ public class ParentQueue extends AbstractCSQueue { scheduler.getNode(rmContainer.getContainer().getNodeId()); super.releaseResource(clusterResource, rmContainer.getContainer().getResource(), - node.getLabels()); + node.getPartition()); LOG.info("movedContainer" + " queueMoveOut=" + getQueueName() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java new file mode 100644 index 0000000..7e7dc37 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +/** + * Scheduling modes, see below for detailed explanations + */ +public enum SchedulingMode { + /** + * <p> + * When a node has partition (say partition=x), only application in the queue + * can access to partition=x AND requires for partition=x resource can get + * chance to allocate on the node. + * </p> + * + * <p> + * When a node has no partition, only application requires non-partitioned + * resource can get chance to allocate on the node. + * </p> + */ + RESPECT_PARTITION_EXCLUSIVITY, + + /** + * Only used when a node has partition AND the partition isn't an exclusive + * partition AND application requires non-partitioned resource. + */ + IGNORE_PARTITION_EXCLUSIVITY +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index 76ede39..9b7eb84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.Task.State; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -277,6 +278,9 @@ public class Application { } else { request.setNumContainers(request.getNumContainers() + 1); } + if (request.getNodeLabelExpression() == null) { + request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + } // Note this down for next interaction with ResourceManager ask.remove(request); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index f62fdb3..5c107aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -150,8 +150,14 @@ public class MockAM { public AllocateResponse allocate( String host, int memory, int numContainers, List<ContainerId> releases, String labelExpression) throws Exception { + return allocate(host, memory, numContainers, 1, releases, labelExpression); + } + + public AllocateResponse allocate( + String host, int memory, int numContainers, int priority, + List<ContainerId> releases, String labelExpression) throws Exception { List<ResourceRequest> reqs = - createReq(new String[] { host }, memory, 1, numContainers, + createReq(new String[] { host }, memory, priority, numContainers, labelExpression); return allocate(reqs, releases); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 06c6b32..f2b1d86 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -200,10 +202,18 @@ public class MockRM extends ResourceManager { public boolean waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState, int timeoutMillisecs) throws Exception { + return waitForState(Arrays.asList(nm), containerId, containerState, + timeoutMillisecs); + } + + public boolean waitForState(Collection<MockNM> nms, ContainerId containerId, + RMContainerState containerState, int timeoutMillisecs) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; while(container == null && timeoutSecs++ < timeoutMillisecs / 100) { - nm.nodeHeartbeat(true); + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); Thread.sleep(100); @@ -217,9 +227,11 @@ public class MockRM extends ResourceManager { && timeoutSecs++ < timeoutMillisecs / 100) { System.out.println("Container : " + containerId + " State is : " + container.getState() + " Waiting for state : " + containerState); - nm.nodeHeartbeat(true); + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } Thread.sleep(100); - + if (timeoutMillisecs <= timeoutSecs * 100) { return false; } @@ -650,11 +662,28 @@ public class MockRM extends ResourceManager { am.waitForState(RMAppAttemptState.FINISHED); rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); } + + @SuppressWarnings("rawtypes") + private static void waitForSchedulerAppAttemptAdded( + ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException { + int tick = 0; + // Wait for at most 5 sec + while (null == ((AbstractYarnScheduler) rm.getResourceScheduler()) + .getApplicationAttempt(attemptId) && tick < 50) { + Thread.sleep(100); + if (tick % 10 == 0) { + System.out.println("waiting for SchedulerApplicationAttempt=" + + attemptId + " added."); + } + tick++; + } + } public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception { rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED); RMAppAttempt attempt = app.getCurrentAppAttempt(); + waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm); System.out.println("Launch AM " + attempt.getAppAttemptId()); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 1ca5c97..46167ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -612,7 +612,7 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); @@ -632,7 +632,7 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); // Schedule to compute + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change @@ -652,7 +652,7 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); // Schedule to compute + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); @@ -661,7 +661,7 @@ public class TestApplicationLimits { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); queue.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); // Schedule to compute + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_1.getHeadroom()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 23b31fa..970a98a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -133,7 +134,7 @@ public class TestChildQueueOrder { final Resource allocatedResource = Resources.createResource(allocation); if (queue instanceof ParentQueue) { ((ParentQueue)queue).allocateResource(clusterResource, - allocatedResource, null); + allocatedResource, RMNodeLabelsManager.NO_LABEL); } else { FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, @@ -145,7 +146,7 @@ public class TestChildQueueOrder { doReturn(new CSAssignment(Resources.none(), type)). when(queue) .assignContainers(eq(clusterResource), eq(node), - any(ResourceLimits.class)); + any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability Resource available = node.getAvailableResource(); @@ -157,7 +158,7 @@ public class TestChildQueueOrder { } }). when(queue).assignContainers(eq(clusterResource), eq(node), - any(ResourceLimits.class)); + any(ResourceLimits.class), any(SchedulingMode.class)); doNothing().when(node).releaseContainer(any(Container.class)); } @@ -241,6 +242,14 @@ public class TestChildQueueOrder { CSQueue b = queues.get(B); CSQueue c = queues.get(C); CSQueue d = queues.get(D); + + // Make a/b/c/d has >0 pending resource, so that allocation will continue. + queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() + .incPending(Resources.createResource(1 * GB)); + a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); + d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); final String user_0 = "user_0"; @@ -275,7 +284,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); for(int i=0; i < 2; i++) { stubQueueAllocation(a, clusterResource, node_0, 0*GB); @@ -283,7 +292,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } for(int i=0; i < 3; i++) { @@ -292,7 +301,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 1*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } for(int i=0; i < 4; i++) { @@ -301,7 +310,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -335,7 +344,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -363,7 +372,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 3*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -390,7 +399,7 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 0*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -405,12 +414,14 @@ public class TestChildQueueOrder { stubQueueAllocation(c, clusterResource, node_0, 0*GB); stubQueueAllocation(d, clusterResource, node_0, 1*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(d,b); - allocationOrder.verify(d).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), any(ResourceLimits.class)); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), any(ResourceLimits.class)); + allocationOrder.verify(d).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), any(ResourceLimits.class), + any(SchedulingMode.class)); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(FiCaSchedulerNode.class), any(ResourceLimits.class), + any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 03b8f5c..54ba617 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -51,9 +54,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -327,387 +334,4 @@ public class TestContainerAllocation { rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED); MockRM.launchAndRegisterAM(app1, rm1, nm1); } - - private Configuration getConfigurationWithQueueLabels(Configuration config) { - CapacitySchedulerConfiguration conf = - new CapacitySchedulerConfiguration(config); - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); - conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); - conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100); - - final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - conf.setCapacity(A, 10); - conf.setMaximumCapacity(A, 15); - conf.setAccessibleNodeLabels(A, toSet("x")); - conf.setCapacityByLabel(A, "x", 100); - - final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - conf.setCapacity(B, 20); - conf.setAccessibleNodeLabels(B, toSet("y")); - conf.setCapacityByLabel(B, "y", 100); - - final String C = CapacitySchedulerConfiguration.ROOT + ".c"; - conf.setCapacity(C, 70); - conf.setMaximumCapacity(C, 70); - conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET); - - // Define 2nd-level queues - final String A1 = A + ".a1"; - conf.setQueues(A, new String[] {"a1"}); - conf.setCapacity(A1, 100); - conf.setMaximumCapacity(A1, 100); - conf.setCapacityByLabel(A1, "x", 100); - - final String B1 = B + ".b1"; - conf.setQueues(B, new String[] {"b1"}); - conf.setCapacity(B1, 100); - conf.setMaximumCapacity(B1, 100); - conf.setCapacityByLabel(B1, "y", 100); - - final String C1 = C + ".c1"; - conf.setQueues(C, new String[] {"c1"}); - conf.setCapacity(C1, 100); - conf.setMaximumCapacity(C1, 100); - - return conf; - } - - private void checkTaskContainersHost(ApplicationAttemptId attemptId, - ContainerId containerId, ResourceManager rm, String host) { - YarnScheduler scheduler = rm.getRMContext().getScheduler(); - SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId); - - Assert.assertTrue(appReport.getLiveContainers().size() > 0); - for (RMContainer c : appReport.getLiveContainers()) { - if (c.getContainerId().equals(containerId)) { - Assert.assertEquals(host, c.getAllocatedNode().getHost()); - } - } - } - - @SuppressWarnings("unchecked") - private <E> Set<E> toSet(E... elements) { - Set<E> set = Sets.newHashSet(elements); - return set; - } - - @Test (timeout = 300000) - public void testContainerAllocationWithSingleUserLimits() throws Exception { - final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); - mgr.init(conf); - - // set node -> label - mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); - mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), - NodeId.newInstance("h2", 0), toSet("y"))); - - // inject node label manager - MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x - rm1.registerNode("h2:1234", 8000); // label = y - MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> - - // launch an app to queue a1 (label = x), and check all container will - // be allocated in h1 - RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - // A has only 10% of x, so it can only allocate one container in label=empty - ContainerId containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), ""); - Assert.assertTrue(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - // Cannot allocate 2nd label=empty container - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), ""); - Assert.assertFalse(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - - // A has default user limit = 100, so it can use all resource in label = x - // We can allocate floor(8000 / 1024) = 7 containers - for (int id = 3; id <= 8; id++) { - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), id); - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x"); - Assert.assertTrue(rm1.waitForState(nm1, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - } - rm1.close(); - } - - @Test(timeout = 300000) - public void testContainerAllocateWithComplexLabels() throws Exception { - /* - * Queue structure: - * root (*) - * ________________ - * / \ - * a x(100%), y(50%) b y(50%), z(100%) - * ________________ ______________ - * / / \ - * a1 (x,y) b1(no) b2(y,z) - * 100% y = 100%, z = 100% - * - * Node structure: - * h1 : x - * h2 : y - * h3 : y - * h4 : z - * h5 : NO - * - * Total resource: - * x: 4G - * y: 6G - * z: 2G - * *: 2G - * - * Resource of - * a1: x=4G, y=3G, NO=0.2G - * b1: NO=0.9G (max=1G) - * b2: y=3, z=2G, NO=0.9G (max=1G) - * - * Each node can only allocate two containers - */ - - // set node -> label - mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z")); - mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), - toSet("x"), NodeId.newInstance("h2", 0), toSet("y"), - NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0), - toSet("z"), NodeId.newInstance("h5", 0), - RMNodeLabelsManager.EMPTY_STRING_SET)); - - // inject node label manager - MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 2048); - MockNM nm2 = rm1.registerNode("h2:1234", 2048); - MockNM nm3 = rm1.registerNode("h3:1234", 2048); - MockNM nm4 = rm1.registerNode("h4:1234", 2048); - MockNM nm5 = rm1.registerNode("h5:1234", 2048); - - ContainerId containerId; - - // launch an app to queue a1 (label = x), and check all container will - // be allocated in h1 - RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - // request a container (label = y). can be allocated on nm2 - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L); - Assert.assertTrue(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, - "h2"); - - // launch an app to queue b1 (label = y), and check all container will - // be allocated in h5 - RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5); - - // request a container for AM, will succeed - // and now b1's queue capacity will be used, cannot allocate more containers - // (Maximum capacity reached) - am2.allocate("*", 1024, 1, new ArrayList<ContainerId>()); - containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm4, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertFalse(rm1.waitForState(nm5, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - - // launch an app to queue b2 - RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2"); - MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5); - - // request a container. try to allocate on nm1 (label = x) and nm3 (label = - // y,z). Will successfully allocate on nm3 - am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); - containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm1, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, - "h3"); - - // try to allocate container (request label = z) on nm4 (label = y,z). - // Will successfully allocate on nm4 only. - am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z"); - containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L); - Assert.assertTrue(rm1.waitForState(nm4, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, - "h4"); - - rm1.close(); - } - - @Test (timeout = 120000) - public void testContainerAllocateWithLabels() throws Exception { - // set node -> label - mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); - mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), - NodeId.newInstance("h2", 0), toSet("y"))); - - // inject node label manager - MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x - MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y - MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> - - ContainerId containerId; - - // launch an app to queue a1 (label = x), and check all container will - // be allocated in h1 - RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3); - - // request a container. - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x"); - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm1, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, - "h1"); - - // launch an app to queue b1 (label = y), and check all container will - // be allocated in h2 - RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3); - - // request a container. - am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y"); - containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm1, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, - "h2"); - - // launch an app to queue c1 (label = ""), and check all container will - // be allocated in h3 - RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); - MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); - - // request a container. - am3.allocate("*", 1024, 1, new ArrayList<ContainerId>()); - containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, - "h3"); - - rm1.close(); - } - - @Test (timeout = 120000) - public void testContainerAllocateWithDefaultQueueLabels() throws Exception { - // This test is pretty much similar to testContainerAllocateWithLabel. - // Difference is, this test doesn't specify label expression in ResourceRequest, - // instead, it uses default queue label expression - - // set node -> label - mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y")); - mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"), - NodeId.newInstance("h2", 0), toSet("y"))); - - // inject node label manager - MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) { - @Override - public RMNodeLabelsManager createNodeLabelManager() { - return mgr; - } - }; - - rm1.getRMContext().setNodeLabelManager(mgr); - rm1.start(); - MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x - MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y - MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty> - - ContainerId containerId; - - // launch an app to queue a1 (label = x), and check all container will - // be allocated in h1 - RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1"); - MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); - - // request a container. - am1.allocate("*", 1024, 1, new ArrayList<ContainerId>()); - containerId = - ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm1, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1, - "h1"); - - // launch an app to queue b1 (label = y), and check all container will - // be allocated in h2 - RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1"); - MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); - - // request a container. - am2.allocate("*", 1024, 1, new ArrayList<ContainerId>()); - containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1, - "h2"); - - // launch an app to queue c1 (label = ""), and check all container will - // be allocated in h3 - RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1"); - MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3); - - // request a container. - am3.allocate("*", 1024, 1, new ArrayList<ContainerId>()); - containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2); - Assert.assertFalse(rm1.waitForState(nm2, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - Assert.assertTrue(rm1.waitForState(nm3, containerId, - RMContainerState.ALLOCATED, 10 * 1000)); - checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1, - "h3"); - - rm1.close(); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 972cabb..0b5250b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -351,7 +351,7 @@ public class TestLeafQueue { // Only 1 container a.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals( (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB), a.getMetrics().getAvailableMB()); @@ -487,7 +487,7 @@ public class TestLeafQueue { // Only 1 container a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -498,7 +498,7 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -507,7 +507,7 @@ public class TestLeafQueue { // Can't allocate 3rd due to user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -517,7 +517,7 @@ public class TestLeafQueue { // Bump up user-limit-factor, now allocate should work a.setUserLimitFactor(10); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -526,7 +526,7 @@ public class TestLeafQueue { // One more should work, for app_1, due to user-limit-factor a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -537,7 +537,7 @@ public class TestLeafQueue { // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); a.assignContainers(clusterResource, node_0, new ResourceLimits( - clusterResource)); + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -653,21 +653,21 @@ public class TestLeafQueue { // 1 container to user_0 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Again one to user_0 since he hasn't exceeded user limit yet a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); // One more to user_0 since he is the only active user a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); @@ -719,10 +719,10 @@ public class TestLeafQueue { 1, qb.getActiveUsersManager().getNumActiveUsers()); //get headroom qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), - null); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); //maxqueue 16G, userlimit 13G, - 4G used = 9G assertEquals(9*GB,app_0.getHeadroom().getMemory()); @@ -739,10 +739,10 @@ public class TestLeafQueue { u1Priority, recordFactory))); qb.submitApplicationAttempt(app_2, user_1); qb.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), - null); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8*GB, qb.getUsedResources().getMemory()); assertEquals(4*GB, app_0.getCurrentConsumption().getMemory()); @@ -782,12 +782,12 @@ public class TestLeafQueue { qb.submitApplicationAttempt(app_1, user_0); qb.submitApplicationAttempt(app_3, user_1); qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3 .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(), - null); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, qb.getUsedResources().getMemory()); //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) assertEquals(5*GB, app_3.getHeadroom().getMemory()); @@ -803,13 +803,13 @@ public class TestLeafQueue { TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, u0Priority, recordFactory))); qb.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4 .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(), - null); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3 .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(), - null); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); //app3 is user1, active from last test case @@ -876,7 +876,7 @@ public class TestLeafQueue { priority, recordFactory))); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -893,7 +893,7 @@ public class TestLeafQueue { priority, recordFactory))); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -982,7 +982,7 @@ public class TestLeafQueue { // 1 container to user_0 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -993,7 +993,7 @@ public class TestLeafQueue { // Again one to user_0 since he hasn't exceeded user limit yet a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -1010,7 +1010,7 @@ public class TestLeafQueue { // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); @@ -1024,7 +1024,7 @@ public class TestLeafQueue { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @@ -1095,7 +1095,7 @@ public class TestLeafQueue { // Only 1 container a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1103,7 +1103,7 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1111,7 +1111,7 @@ public class TestLeafQueue { // Can't allocate 3rd due to user-limit a.setUserLimit(25); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1130,7 +1130,7 @@ public class TestLeafQueue { // user_0 is at limit inspite of high user-limit-factor a.setUserLimitFactor(10); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1140,7 +1140,7 @@ public class TestLeafQueue { // Now allocations should goto app_0 since // user_0 is at user-limit not above it a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1151,7 +1151,7 @@ public class TestLeafQueue { // Now - no more allocs since we are at max-cap a.setMaxCapacity(0.5f); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1163,7 +1163,7 @@ public class TestLeafQueue { a.setMaxCapacity(1.0f); a.setUserLimitFactor(1); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(7*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1172,7 +1172,7 @@ public class TestLeafQueue { // Now we should assign to app_3 again since user_2 is under user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1272,7 +1272,7 @@ public class TestLeafQueue { // Only 1 container a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1283,7 +1283,7 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1292,7 +1292,7 @@ public class TestLeafQueue { // Now, reservation should kick in for app_1 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1309,7 +1309,7 @@ public class TestLeafQueue { ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1326,7 +1326,7 @@ public class TestLeafQueue { ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1394,7 +1394,7 @@ public class TestLeafQueue { // Start testing... a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1404,7 +1404,7 @@ public class TestLeafQueue { // Now, reservation should kick in for app_1 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1418,7 +1418,7 @@ public class TestLeafQueue { doReturn(-1).when(a).getNodeLocalityDelay(); a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(10*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1435,7 +1435,7 @@ public class TestLeafQueue { ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(8*GB, app_1.getCurrentConsumption().getMemory()); @@ -1504,7 +1504,7 @@ public class TestLeafQueue { // Only 1 container a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1512,14 +1512,14 @@ public class TestLeafQueue { // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // you can get one container more than user-limit a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); // Now, reservation should kick in for app_1 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(6*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1534,7 +1534,7 @@ public class TestLeafQueue { ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1544,7 +1544,7 @@ public class TestLeafQueue { // Re-reserve a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); @@ -1554,7 +1554,7 @@ public class TestLeafQueue { // Try to schedule on node_1 now, should *move* the reservation a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(9*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1572,7 +1572,7 @@ public class TestLeafQueue { ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), RMContainerEventType.KILL, null, true); CSAssignment assignment = a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(8*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(4*GB, app_1.getCurrentConsumption().getMemory()); @@ -1644,7 +1644,7 @@ public class TestLeafQueue { // Start with off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); @@ -1653,7 +1653,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority)); @@ -1662,7 +1662,7 @@ public class TestLeafQueue { // Another off switch, shouldn't allocate due to delay scheduling assignment = a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority)); @@ -1672,7 +1672,7 @@ public class TestLeafQueue { // Another off switch, now we should allocate // since missedOpportunities=3 and reqdContainers=3 assignment = a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset @@ -1681,7 +1681,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_0 assignment = a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1690,7 +1690,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_1 assignment = a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1719,14 +1719,14 @@ public class TestLeafQueue { // Shouldn't assign RACK_LOCAL yet assignment = a.assignContainers(clusterResource, node_3, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertEquals(1, app_0.getSchedulingOpportunities(priority)); assertEquals(2, app_0.getTotalRequiredResources(priority)); assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL // Should assign RACK_LOCAL now assignment = a.assignContainers(clusterResource, node_3, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1808,7 +1808,7 @@ public class TestLeafQueue { // Start with off switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority_1)); @@ -1821,7 +1821,7 @@ public class TestLeafQueue { // Another off-switch, shouldn't allocate P1 due to delay scheduling // thus, no P2 either! a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(2, app_0.getSchedulingOpportunities(priority_1)); @@ -1833,7 +1833,7 @@ public class TestLeafQueue { // Another off-switch, shouldn't allocate OFF_SWITCH P1 a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(3, app_0.getSchedulingOpportunities(priority_1)); @@ -1845,7 +1845,7 @@ public class TestLeafQueue { // Now, DATA_LOCAL for P1 a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1857,7 +1857,7 @@ public class TestLeafQueue { // Now, OFF_SWITCH for P2 a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), eq(priority_1), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority_1)); @@ -1934,7 +1934,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_0_1 a.assignContainers(clusterResource, node_0_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -1943,7 +1943,7 @@ public class TestLeafQueue { // No allocation on node_1_0 even though it's node/rack local since // required(ANY) == 0 a.assignContainers(clusterResource, node_1_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero @@ -1960,7 +1960,7 @@ public class TestLeafQueue { // No allocation on node_0_1 even though it's node/rack local since // required(rack_1) == 0 a.assignContainers(clusterResource, node_0_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(1, app_0.getSchedulingOpportunities(priority)); @@ -1968,7 +1968,7 @@ public class TestLeafQueue { // NODE_LOCAL - node_1 a.assignContainers(clusterResource, node_1_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset @@ -2221,7 +2221,7 @@ public class TestLeafQueue { // node_0_1 // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false a.assignContainers(clusterResource, node_0_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2244,7 +2244,7 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since RR(rack_1) = relax: false a.assignContainers(clusterResource, node_1_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2275,7 +2275,7 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since node_1_1 is blacklisted a.assignContainers(clusterResource, node_1_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2304,7 +2304,7 @@ public class TestLeafQueue { // node_1_1 // Shouldn't allocate since rack_1 is blacklisted a.assignContainers(clusterResource, node_1_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0 @@ -2331,7 +2331,7 @@ public class TestLeafQueue { // Now, should allocate since RR(rack_1) = relax: true a.assignContainers(clusterResource, node_1_1, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2362,7 +2362,7 @@ public class TestLeafQueue { // host_1_1: 7G a.assignContainers(clusterResource, node_1_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), any(Priority.class), any(ResourceRequest.class), any(Container.class)); assertEquals(0, app_0.getSchedulingOpportunities(priority)); @@ -2445,7 +2445,7 @@ public class TestLeafQueue { try { a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource)); + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } catch (NullPointerException e) { Assert.fail("NPE when allocating container on node but " + "forget to set off-switch request should be handled");