http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java new file mode 100644 index 0000000..19148d7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; + +public class PreemptableQueue { + // Partition -> killable resources and containers + private Map<String, Resource> totalKillableResources = new HashMap<>(); + private Map<String, Map<ContainerId, RMContainer>> killableContainers = + new HashMap<>(); + private PreemptableQueue parent; + + public PreemptableQueue(PreemptableQueue parent) { + this.parent = parent; + } + + public PreemptableQueue(Map<String, Resource> totalKillableResources, + Map<String, Map<ContainerId, RMContainer>> killableContainers) { + this.totalKillableResources = totalKillableResources; + this.killableContainers = killableContainers; + } + + void addKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + if (!totalKillableResources.containsKey(partition)) { + totalKillableResources.put(partition, Resources.createResource(0)); + killableContainers.put(partition, + new ConcurrentSkipListMap<ContainerId, RMContainer>()); + } + + RMContainer c = container.getRMContainer(); + Resources.addTo(totalKillableResources.get(partition), + c.getAllocatedResource()); + killableContainers.get(partition).put(c.getContainerId(), c); + + if (null != parent) { + parent.addKillableContainer(container); + } + } + + void removeKillableContainer(KillableContainer container) { + String partition = container.getNodePartition(); + Map<ContainerId, RMContainer> partitionKillableContainers = + killableContainers.get(partition); + if (partitionKillableContainers != null) { + RMContainer rmContainer = partitionKillableContainers.remove( + container.getRMContainer().getContainerId()); + if (null != rmContainer) { + Resources.subtractFrom(totalKillableResources.get(partition), + rmContainer.getAllocatedResource()); + } + } + + if (null != parent) { + parent.removeKillableContainer(container); + } + } + + public Resource getKillableResource(String partition) { + Resource res = totalKillableResources.get(partition); + return res == null ? Resources.none() : res; + } + + @SuppressWarnings("unchecked") + public Map<ContainerId, RMContainer> getKillableContainers(String partition) { + Map<ContainerId, RMContainer> map = killableContainers.get(partition); + return map == null ? Collections.EMPTY_MAP : map; + } + + public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() { + return killableContainers; + } + + Map<String, Resource> getTotalKillableResources() { + return totalKillableResources; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java new file mode 100644 index 0000000..a9f02a5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class PreemptionManager { + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + private Map<String, PreemptableQueue> entities = new HashMap<>(); + + public PreemptionManager() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + } + + public void refreshQueues(CSQueue parent, CSQueue current) { + try { + writeLock.lock(); + PreemptableQueue parentEntity = null; + if (parent != null) { + parentEntity = entities.get(parent.getQueueName()); + } + + if (!entities.containsKey(current.getQueueName())) { + entities.put(current.getQueueName(), + new PreemptableQueue(parentEntity)); + } + + if (current.getChildQueues() != null) { + for (CSQueue child : current.getChildQueues()) { + refreshQueues(current, child); + } + } + } + finally { + writeLock.unlock(); + } + } + + public void addKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + PreemptableQueue entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.addKillableContainer(container); + } + } + finally { + writeLock.unlock(); + } + } + + public void removeKillableContainer(KillableContainer container) { + try { + writeLock.lock(); + PreemptableQueue entity = entities.get(container.getLeafQueueName()); + if (null != entity) { + entity.removeKillableContainer(container); + } + } + finally { + writeLock.unlock(); + } + } + + public void moveKillableContainer(KillableContainer oldContainer, + KillableContainer newContainer) { + // TODO, will be called when partition of the node changed OR + // container moved to different queue + } + + public void updateKillableContainerResource(KillableContainer container, + Resource oldResource, Resource newResource) { + // TODO, will be called when container's resource changed + } + + @VisibleForTesting + public Map<ContainerId, RMContainer> getKillableContainersMap( + String queueName, String partition) { + try { + readLock.lock(); + PreemptableQueue entity = entities.get(queueName); + if (entity != null) { + Map<ContainerId, RMContainer> containers = + entity.getKillableContainers().get(partition); + if (containers != null) { + return containers; + } + } + return Collections.emptyMap(); + } + finally { + readLock.unlock(); + } + } + + public Iterator<RMContainer> getKillableContainers(String queueName, + String partition) { + return getKillableContainersMap(queueName, partition).values().iterator(); + } + + public Resource getKillableResource(String queueName, String partition) { + try { + readLock.lock(); + PreemptableQueue entity = entities.get(queueName); + if (entity != null) { + Resource res = entity.getTotalKillableResources().get(partition); + if (res == null || res.equals(Resources.none())) { + return Resources.none(); + } + return Resources.clone(res); + } + return Resources.none(); + } + finally { + readLock.unlock(); + } + } + + public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() { + try { + readLock.lock(); + Map<String, PreemptableQueue> map = new HashMap<>(); + for (Map.Entry<String, PreemptableQueue> entry : entities.entrySet()) { + String key = entry.getKey(); + PreemptableQueue entity = entry.getValue(); + map.put(key, new PreemptableQueue( + new HashMap<>(entity.getTotalKillableResources()), + new HashMap<>(entity.getKillableContainers()))); + } + return map; + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java index 5158255..aad3bc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -120,9 +120,9 @@ public class AssignmentInformation { } private ContainerId getFirstContainerIdFromOperation(Operation op) { - if (null != operationDetails.get(Operation.ALLOCATION)) { + if (null != operationDetails.get(op)) { List<AssignmentDetails> assignDetails = - operationDetails.get(Operation.ALLOCATION); + operationDetails.get(op); if (!assignDetails.isEmpty()) { return assignDetails.get(0).containerId; } @@ -131,7 +131,7 @@ public class AssignmentInformation { } public ContainerId getFirstAllocatedOrReservedContainerId() { - ContainerId containerId = null; + ContainerId containerId; containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION); if (null != containerId) { return containerId; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4d563cd..f474aad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMCont import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -94,6 +95,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { * to hold the message if its app doesn't not get container from a node */ private String appSkipNodeDiagnostics; + private CapacitySchedulerContext capacitySchedulerContext; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -138,28 +140,30 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } containerAllocator = new ContainerAllocator(this, rc, rmContext); + + if (scheduler instanceof CapacityScheduler) { + capacitySchedulerContext = (CapacitySchedulerContext) scheduler; + } } - synchronized public boolean containerCompleted(RMContainer rmContainer, + public synchronized boolean containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event, String partition) { + ContainerId containerId = rmContainer.getContainerId(); // Remove from the list of containers - if (null == liveContainers.remove(rmContainer.getContainerId())) { + if (null == liveContainers.remove(containerId)) { return false; } - + // Remove from the list of newly allocated containers if found newlyAllocatedContainers.remove(rmContainer); - Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); - // Inform the container rmContainer.handle( new RMContainerFinishedEvent(containerId, containerStatus, event)); - containersToPreempt.remove(rmContainer.getContainerId()); + containersToPreempt.remove(containerId); RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, "SchedulerApp", @@ -176,7 +180,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return true; } - synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, + public synchronized RMContainer allocate(NodeType type, FiCaSchedulerNode node, Priority priority, ResourceRequest request, Container container) { @@ -200,7 +204,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Add it to allContainers list. newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); + + ContainerId containerId = container.getId(); + liveContainers.put(containerId, rmContainer); // Update consumption and track allocations List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( @@ -213,17 +219,17 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { // Inform the container rmContainer.handle( - new RMContainerEvent(container.getId(), RMContainerEventType.START)); + new RMContainerEvent(containerId, RMContainerEventType.START)); if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationAttemptId=" - + container.getId().getApplicationAttemptId() - + " container=" + container.getId() + " host=" + + containerId.getApplicationAttemptId() + + " container=" + containerId + " host=" + container.getNodeId().getHost() + " type=" + type); } RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), container.getId()); + getApplicationId(), containerId); return rmContainer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index fe6db47..1d0e78a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -18,22 +18,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; - -import java.util.Set; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); + private Map<ContainerId, RMContainer> killableContainers = new HashMap<>(); + private Resource totalKillableResources = Resource.newInstance(0, 0); public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName, Set<String> nodeLabels) { @@ -92,7 +99,6 @@ public class FiCaSchedulerNode extends SchedulerNode { @Override public synchronized void unreserveResource( SchedulerApplicationAttempt application) { - // adding NP checks as this can now be called for preemption if (getReservedContainer() != null && getReservedContainer().getContainer() != null @@ -115,4 +121,55 @@ public class FiCaSchedulerNode extends SchedulerNode { } setReservedContainer(null); } + + // According to decisions from preemption policy, mark the container to killable + public synchronized void markContainerToKillable(ContainerId containerId) { + RMContainer c = launchedContainers.get(containerId); + if (c != null && !killableContainers.containsKey(containerId)) { + killableContainers.put(containerId, c); + Resources.addTo(totalKillableResources, c.getAllocatedResource()); + } + } + + // According to decisions from preemption policy, mark the container to + // non-killable + public synchronized void markContainerToNonKillable(ContainerId containerId) { + RMContainer c = launchedContainers.get(containerId); + if (c != null && killableContainers.containsKey(containerId)) { + killableContainers.remove(containerId); + Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); + } + } + + @Override + protected synchronized void updateResource( + Container container) { + super.updateResource(container); + if (killableContainers.containsKey(container.getId())) { + Resources.subtractFrom(totalKillableResources, container.getResource()); + killableContainers.remove(container.getId()); + } + } + + @Override + protected synchronized void changeContainerResource(ContainerId containerId, + Resource deltaResource, boolean increase) { + super.changeContainerResource(containerId, deltaResource, increase); + + if (killableContainers.containsKey(containerId)) { + if (increase) { + Resources.addTo(totalKillableResources, deltaResource); + } else { + Resources.subtractFrom(totalKillableResources, deltaResource); + } + } + } + + public synchronized Resource getTotalKillableResources() { + return totalKillableResources; + } + + public synchronized Map<ContainerId, RMContainer> getKillableContainers() { + return killableContainers; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 9cf09e9..35b7c14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,10 +38,15 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - // Source: SchedulingEditPolicy + /* Source: SchedulingEditPolicy */ KILL_RESERVED_CONTAINER, - MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption - // in the near future - KILL_PREEMPTED_CONTAINER // Kill a container previously marked for - // preemption + + // Mark a container for preemption + MARK_CONTAINER_FOR_PREEMPTION, + + // Mark a for-preemption container killable + MARK_CONTAINER_FOR_KILLABLE, + + // Cancel a killable container + MARK_CONTAINER_FOR_NONKILLABLE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index d9306dd..c944752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -59,7 +59,7 @@ public class TestRMDispatcher { rmDispatcher.getEventHandler().handle(event1); ContainerPreemptEvent event2 = new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_PREEMPTED_CONTAINER); + SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE); rmDispatcher.getEventHandler().handle(event2); ContainerPreemptEvent event3 = new ContainerPreemptEvent(appAttemptId, container, @@ -70,7 +70,7 @@ public class TestRMDispatcher { verify(sched, times(3)).handle(any(SchedulerEvent.class)); verify(sched).killReservedContainer(container); verify(sched).markContainerForPreemption(appAttemptId, container); - verify(sched).killPreemptedContainer(container); + verify(sched).markContainerForKillable(container); } catch (InterruptedException e) { Assert.fail(); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 028afb1..3057615 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -2352,7 +2352,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase { FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications() .get(app0.getApplicationId()).getCurrentAppAttempt(); // kill app0-attempt - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer( + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer( app0.getCurrentAppAttempt().getMasterContainer().getId())); am0.waitForState(RMAppAttemptState.FAILED); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 5035afe..16f3f60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -566,7 +565,7 @@ public class TestAMRestart { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -582,7 +581,7 @@ public class TestAMRestart { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -677,7 +676,7 @@ public class TestAMRestart { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); + scheduler.markContainerForKillable(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 13f267d..e9129de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,7 +23,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; @@ -167,6 +168,7 @@ public class TestProportionalCapacityPreemptionPolicy { when(mCS.getConfiguration()).thenReturn(schedConf); rmContext = mock(RMContext.class); when(mCS.getRMContext()).thenReturn(rmContext); + when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager()); when(rmContext.getNodeLabelManager()).thenReturn(lm); mDisp = mock(EventHandler.class); Dispatcher disp = mock(Dispatcher.class); @@ -289,7 +291,7 @@ public class TestProportionalCapacityPreemptionPolicy { List<ContainerPreemptEvent> events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); + assertEquals(MARK_CONTAINER_FOR_KILLABLE, e.getType()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 512f37c..21ea495 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -123,6 +124,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { mClock = mock(Clock.class); cs = mock(CapacityScheduler.class); when(cs.getResourceCalculator()).thenReturn(rc); + when(cs.getPreemptionManager()).thenReturn(new PreemptionManager()); nlm = mock(RMNodeLabelsManager.class); mDisp = mock(EventHandler.class); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 0b32676..171196f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -264,6 +265,7 @@ public class TestApplicationLimits { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); // Say cluster has 100 nodes of 16G each Resource clusterResource = http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 1569a12..d8161f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -512,7 +512,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -542,7 +542,7 @@ public class TestApplicationPriority { if (++counter > 1) { break; } - cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b6c005b..16ba607 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1188,7 +1188,7 @@ public class TestCapacityScheduler { // kill the 3 containers for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1197,7 +1197,7 @@ public class TestCapacityScheduler { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1220,7 +1220,7 @@ public class TestCapacityScheduler { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1269,7 +1269,7 @@ public class TestCapacityScheduler { } // Call killContainer to preempt the container - cs.killPreemptedContainer(rmContainer); + cs.markContainerForKillable(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java new file mode 100644 index 0000000..bea7797 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java @@ -0,0 +1,677 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestCapacitySchedulerPreemption { + private static final Log LOG = LogFactory.getLog( + TestCapacitySchedulerPreemption.class); + + private final int GB = 1024; + + private Configuration conf; + + RMNodeLabelsManager mgr; + + Clock clock; + + @Before + public void setUp() throws Exception { + conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class); + conf = TestUtils.getConfigurationWithMultipleQueues(this.conf); + + // Set preemption related configurations + conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL, + 0); + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + true); + conf.setFloat( + ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f); + conf.setFloat( + ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f); + mgr = new NullRMNodeLabelsManager(); + mgr.init(this.conf); + clock = mock(Clock.class); + when(clock.getTime()).thenReturn(0L); + } + + private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) { + RMActiveServices activeServices = rm.getRMActiveService(); + SchedulingMonitor mon = null; + for (Service service : activeServices.getServices()) { + if (service instanceof SchedulingMonitor) { + mon = (SchedulingMonitor) service; + break; + } + } + + if (mon != null) { + return mon.getSchedulingEditPolicy(); + } + return null; + } + + @Test (timeout = 60000) + public void testSimplePreemption() throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(1, killableContainers.size()); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 6 containers, and app2 has 2 containers + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersNodeLocalityDelay() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with locality specified, so it needs + * to wait for missed-opportunity before get scheduled. + * Check if system waits missed-opportunity before finish killable container + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container with unknown host and unknown rack + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "unknownhost", + Resources.createResource(1 * GB), 1), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, one container will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 6 containers, and app2 has 2 containers (new container allocated) + Assert.assertEquals(6, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersHardNodeLocality() + throws Exception { + /** + * Test case: same as testSimplePreemption steps 1-3. + * + * Step 4: app2 asks for 1G container with hard locality specified, and + * asked host is not existed + * Confirm system doesn't preempt any container. + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container for h3 with hard locality, + // h3 doesn't exist in the cluster + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1, true), ResourceRequest + .newInstance(Priority.newInstance(1), "h3", + Resources.createResource(1 * GB), 1, false), ResourceRequest + .newInstance(Priority.newInstance(1), "/default-rack", + Resources.createResource(1 * GB), 1, false)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey() + .getApplicationAttemptId(), am1.getApplicationAttemptId()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (no container preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + // Do allocation again, nothing will be preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // App1 has 7 containers, and app2 has 1 containers (no container allocated) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + @Test (timeout = 60000) + public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers() + throws Exception { + /** + * Test case: + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Killable containers resources will be excluded from PCPP (no duplicated + * container added to killable list) + * 2) When more resources need to be preempted, new containers will be selected + * and killable containers will be considered + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if one container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1); + + // Check killable containers and to-be-preempted containers in edit policy + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Run edit schedule again, confirm status doesn't changed + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Save current to kill containers + Set<ContainerId> previousKillableContainers = new HashSet<>( + pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL) + .keySet()); + + // Update request resource of c from 1 to 2, so we need to preempt + // one more container + am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>()); + + // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map + // and 1 container in killable map + editPolicy.editSchedule(); + Assert.assertEquals(1, editPolicy.getToPreemptContainers().size()); + + // Call editPolicy.editSchedule() once more, we should have 2 containers killable map + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + + // Check if previous killable containers included by new killable containers + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + Assert.assertTrue( + Sets.difference(previousKillableContainers, killableContainers.keySet()) + .isEmpty()); + } + + @Test (timeout = 60000) + public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded() + throws Exception { + /** + * Test case: + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * Submit applications to two queues, one uses more than the other, so + * preemption will happen. + * + * Check: + * 1) Containers will be marked to killable + * 2) Cancel resource request + * 3) Killable containers will be cancelled from policy and scheduler + */ + MockRM rm1 = new MockRM(conf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 6 times for node1 + for (int i = 0; i < 6; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // NM1 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>()); + + // Get edit policy and do one update + ProportionalCapacityPreemptionPolicy editPolicy = + (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if 3 container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + PreemptionManager pm = cs.getPreemptionManager(); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3); + + // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2) + am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>()); + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + + // Call editSchedule once more to make sure still nothing happens + editPolicy.editSchedule(); + Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2); + } + + @Test (timeout = 60000) + public void testPreemptionConsidersUserLimit() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + * <pre> + * Root + * / | \ + * a b c + * 10 20 70 + * </pre> + * + * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c + * + * 1) Two nodes in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no + * more resource available. + * + * 3) app2 submit to queue-c, ask for one 1G container (for AM) + * + * Now the cluster is fulfilled. + * + * 4) app2 asks for another 1G container, system will preempt one container + * from app1, and app2 will receive the preempted container + */ + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); + csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f); + MockRM rm1 = new MockRM(csConf); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB); + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + // launch an app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>()); + + // Do allocation 3 times for node1/node2 + for (int i = 0; i < 3; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // App1 should have 7 containers now, and no available resource for cluster + FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt( + am1.getApplicationAttemptId()); + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + + // Submit app2 to queue-c and asks for a 1G container for AM + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // NM1/NM2 has available resource = 0G + Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId()) + .getUnallocatedResource().getMemory()); + Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId()) + .getUnallocatedResource().getMemory()); + + // AM asks for a 1 * GB container + am2.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(1), ResourceRequest.ANY, + Resources.createResource(1 * GB), 1)), null); + + // Get edit policy and do one update + SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); + + // Call edit schedule twice, and check if no container from app1 marked + // to be "killable" + editPolicy.editSchedule(); + editPolicy.editSchedule(); + + // No preemption happens + PreemptionManager pm = cs.getPreemptionManager(); + Map<ContainerId, RMContainer> killableContainers = + waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0); + Assert.assertEquals(0, killableContainers.size()); + + // Call CS.handle once to see if container preempted + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( + am2.getApplicationAttemptId()); + + // App1 has 7 containers, and app2 has 1 containers (nothing preempted) + Assert.assertEquals(7, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(1, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } + + private Map<ContainerId, RMContainer> waitKillableContainersSize( + PreemptionManager pm, String queueName, String partition, + int expectedSize) throws InterruptedException { + Map<ContainerId, RMContainer> killableContainers = + pm.getKillableContainersMap(queueName, partition); + + int wait = 0; + // Wait for at most 5 sec (it should be super fast actually) + while (expectedSize != killableContainers.size() && wait < 500) { + killableContainers = pm.getKillableContainersMap(queueName, partition); + Thread.sleep(10); + wait++; + } + + Assert.assertEquals(expectedSize, killableContainers.size()); + return killableContainers; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 5169337..1612201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -99,6 +100,7 @@ public class TestChildQueueOrder { when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); } private FiCaSchedulerApp getMockApplication(int appId, String user) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 69b0813..87a3d51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -150,6 +151,7 @@ public class TestLeafQueue { thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(conf); @@ -3092,6 +3094,7 @@ public class TestLeafQueue { Resources.createResource(GB, 1)); when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(2 * GB, 2)); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); return csContext; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index bbf6e43..1ee201d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -1676,4 +1677,100 @@ public class TestNodeLabelContainerAllocation { checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), cs.getApplicationAttempt(am1.getApplicationAttemptId())); } + + @Test + public void testParentQueueMaxCapsAreRespected() throws Exception { + /* + * Queue tree: + * Root + * / \ + * A B + * / \ + * A1 A2 + * + * A has 50% capacity and 50% max capacity (of label=x) + * A1/A2 has 50% capacity and 100% max capacity (of label=x) + * Cluster has one node (label=x) with resource = 24G. + * So we can at most use 12G resources under queueA. + */ + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 10); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 50); + csConf.setMaximumCapacityByLabel(A, "x", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 90); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 50); + csConf.setMaximumCapacityByLabel(B, "x", 50); + + // Define 2nd-level queues + csConf.setQueues(A, new String[] { "a1", + "a2"}); + + final String A1 = A + ".a1"; + csConf.setCapacity(A1, 50); + csConf.setAccessibleNodeLabels(A1, toSet("x")); + csConf.setCapacityByLabel(A1, "x", 50); + csConf.setMaximumCapacityByLabel(A1, "x", 100); + csConf.setUserLimitFactor(A1, 100.0f); + + final String A2 = A + ".a2"; + csConf.setCapacity(A2, 50); + csConf.setAccessibleNodeLabels(A2, toSet("x")); + csConf.setCapacityByLabel(A2, "x", 50); + csConf.setMaximumCapacityByLabel(A2, "x", 100); + csConf.setUserLimitFactor(A2, 100.0f); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of( + NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = + new MockNM("h1:1234", 24 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + + // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + am1.allocate("*", 4 * GB, 2, new ArrayList<ContainerId>(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 10); + checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + // Try to launch app2 in a2, asked 2GB, should success + RMApp app2 = rm.submitApp(2 * GB, "app", "user", null, "a2", "x"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + // am2 asks more resources, cannot success because current used = 9G (app1) + // + 2G (app2) = 11G, and queue's max capacity = 12G + am2.allocate("*", 2 * GB, 2, new ArrayList<ContainerId>(), "x"); + + doNMHeartbeat(rm, nm1.getNodeId(), 10); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index f73baa4..23dc860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -92,6 +93,7 @@ public class TestParentQueue { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7e8c9beb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 2ef5e39..56facee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -126,6 +127,7 @@ public class TestReservations { when(csContext.getNonPartitionedQueueComparator()).thenReturn( CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( conf);