YARN-4822. Refactor existing Preemption Policy of CS for easier adding new approach to select preemption candidates. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/60e4116b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/60e4116b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/60e4116b Branch: refs/heads/HDFS-1312 Commit: 60e4116bf1d00afed91010e57357fe54057e4e39 Parents: 09d63d5 Author: Jian He <jia...@apache.org> Authored: Wed Mar 30 12:43:52 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Wed Mar 30 12:43:52 2016 -0700 ---------------------------------------------------------------------- .../monitor/SchedulingEditPolicy.java | 8 +- .../monitor/SchedulingMonitor.java | 4 - .../CapacitySchedulerPreemptionContext.java | 52 + .../CapacitySchedulerPreemptionUtils.java | 65 ++ .../capacity/FifoCandidatesSelector.java | 364 ++++++ .../capacity/PreemptableResourceCalculator.java | 370 ++++++ .../capacity/PreemptionCandidatesSelector.java | 52 + .../ProportionalCapacityPreemptionPolicy.java | 1086 ++++-------------- .../monitor/capacity/TempQueuePerPartition.java | 159 +++ .../CapacitySchedulerConfiguration.java | 45 + .../capacity/preemption/PreemptableQueue.java | 6 - .../capacity/preemption/PreemptionManager.java | 2 +- ...estProportionalCapacityPreemptionPolicy.java | 133 +-- ...pacityPreemptionPolicyForNodePartitions.java | 78 +- .../TestCapacitySchedulerPreemption.java | 14 +- 15 files changed, 1432 insertions(+), 1006 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java index 0d587d8..47458a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java @@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour public interface SchedulingEditPolicy { - public void init(Configuration config, RMContext context, + void init(Configuration config, RMContext context, PreemptableResourceScheduler scheduler); /** @@ -31,10 +31,10 @@ public interface SchedulingEditPolicy { * allowed to track containers and affect the scheduler. The "actions" * performed are passed back through an EventHandler. */ - public void editSchedule(); + void editSchedule(); - public long getMonitoringInterval(); + long getMonitoringInterval(); - public String getPolicyName(); + String getPolicyName(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index d4c129b..55ec858 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -45,10 +45,6 @@ public class SchedulingMonitor extends AbstractService { this.rmContext = rmContext; } - public long getMonitorInterval() { - return monitorInterval; - } - @VisibleForTesting public synchronized SchedulingEditPolicy getSchedulingEditPolicy() { return scheduleEditPolicy; http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java new file mode 100644 index 0000000..c52127d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.util.Collection; +import java.util.Set; + +interface CapacitySchedulerPreemptionContext { + CapacityScheduler getScheduler(); + + TempQueuePerPartition getQueueByPartition(String queueName, + String partition); + + Collection<TempQueuePerPartition> getQueuePartitions(String queueName); + + ResourceCalculator getResourceCalculator(); + + RMContext getRMContext(); + + boolean isObserveOnly(); + + Set<ContainerId> getKillableContainers(); + + double getMaxIgnoreOverCapacity(); + + double getNaturalTerminationFactor(); + + Set<String> getLeafQueueNames(); + + Set<String> getAllPartitions(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java new file mode 100644 index 0000000..a71f108 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class CapacitySchedulerPreemptionUtils { + public static Map<String, Resource> getResToObtainByPartitionForLeafQueue( + CapacitySchedulerPreemptionContext context, String queueName, + Resource clusterResource) { + Map<String, Resource> resToObtainByPartition = new HashMap<>(); + // compute resToObtainByPartition considered inter-queue preemption + for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { + if (qT.preemptionDisabled) { + continue; + } + + // Only add resToObtainByPartition when actuallyToBePreempted resource >= 0 + if (Resources.greaterThan(context.getResourceCalculator(), + clusterResource, qT.actuallyToBePreempted, Resources.none())) { + resToObtainByPartition.put(qT.partition, + Resources.clone(qT.actuallyToBePreempted)); + } + } + + return resToObtainByPartition; + } + + public static boolean isContainerAlreadySelected(RMContainer container, + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) { + if (null == selectedCandidates) { + return false; + } + + Set<RMContainer> containers = selectedCandidates.get( + container.getApplicationAttemptId()); + if (containers == null) { + return false; + } + return containers.contains(container); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java new file mode 100644 index 0000000..499d0ff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java @@ -0,0 +1,364 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +public class FifoCandidatesSelector + extends PreemptionCandidatesSelector { + private static final Log LOG = + LogFactory.getLog(FifoCandidatesSelector.class); + private PreemptableResourceCalculator preemptableAmountCalculator; + + FifoCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + super(preemptionContext); + + preemptableAmountCalculator = new PreemptableResourceCalculator( + preemptionContext); + } + + @Override + public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource clusterResource, Resource totalPreemptionAllowed) { + // Calculate how much resources we need to preempt + preemptableAmountCalculator.computeIdealAllocation(clusterResource, + totalPreemptionAllowed); + + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap = + new HashMap<>(); + List<RMContainer> skippedAMContainerlist = new ArrayList<>(); + + // Loop all leaf queues + for (String queueName : preemptionContext.getLeafQueueNames()) { + // check if preemption disabled for the queue + if (preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); + } + continue; + } + + // compute resToObtainByPartition considered inter-queue preemption + LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).leafQueue; + + Map<String, Resource> resToObtainByPartition = + CapacitySchedulerPreemptionUtils + .getResToObtainByPartitionForLeafQueue(preemptionContext, + queueName, clusterResource); + + synchronized (leafQueue) { + // go through all ignore-partition-exclusivity containers first to make + // sure such containers will be preemptionCandidates first + Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers = + leafQueue.getIgnoreExclusivityRMContainers(); + for (String partition : resToObtainByPartition.keySet()) { + if (ignorePartitionExclusivityContainers.containsKey(partition)) { + TreeSet<RMContainer> rmContainers = + ignorePartitionExclusivityContainers.get(partition); + // We will check container from reverse order, so latter submitted + // application's containers will be preemptionCandidates first. + for (RMContainer c : rmContainers.descendingSet()) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedCandidates)) { + // Skip already selected containers + continue; + } + boolean preempted = tryPreemptContainerAndDeductResToObtain( + resToObtainByPartition, c, clusterResource, preemptMap, + totalPreemptionAllowed); + if (!preempted) { + continue; + } + } + } + } + + // preempt other containers + Resource skippedAMSize = Resource.newInstance(0, 0); + Iterator<FiCaSchedulerApp> desc = + leafQueue.getOrderingPolicy().getPreemptionIterator(); + while (desc.hasNext()) { + FiCaSchedulerApp fc = desc.next(); + // When we complete preempt from one partition, we will remove from + // resToObtainByPartition, so when it becomes empty, we can get no + // more preemption is needed + if (resToObtainByPartition.isEmpty()) { + break; + } + + preemptFrom(fc, clusterResource, resToObtainByPartition, + skippedAMContainerlist, skippedAMSize, preemptMap, + totalPreemptionAllowed); + } + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preemptionCandidates from this Queue. + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + leafQueue.getAbsoluteCapacity()), + leafQueue.getMaxAMResourcePerQueuePercent()); + + preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist, + resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue, + totalPreemptionAllowed); + } + } + + return preemptMap; + } + + /** + * As more resources are needed for preemption, saved AMContainers has to be + * rescanned. Such AMContainers can be preemptionCandidates based on resToObtain, but + * maxAMCapacityForThisQueue resources will be still retained. + * + * @param clusterResource + * @param preemptMap + * @param skippedAMContainerlist + * @param skippedAMSize + * @param maxAMCapacityForThisQueue + */ + private void preemptAMContainers(Resource clusterResource, + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, + List<RMContainer> skippedAMContainerlist, + Map<String, Resource> resToObtainByPartition, Resource skippedAMSize, + Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) { + for (RMContainer c : skippedAMContainerlist) { + // Got required amount of resources for preemption, can stop now + if (resToObtainByPartition.isEmpty()) { + break; + } + // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, + // container selection iteration for preemption will be stopped. + if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, + maxAMCapacityForThisQueue)) { + break; + } + + boolean preempted = + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, preemptMap, totalPreemptionAllowed); + if (preempted) { + Resources.subtractFrom(skippedAMSize, c.getAllocatedResource()); + } + } + skippedAMContainerlist.clear(); + } + + private boolean preemptMapContains( + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, + ApplicationAttemptId attemptId, RMContainer rmContainer) { + Set<RMContainer> rmContainers; + if (null == (rmContainers = preemptMap.get(attemptId))) { + return false; + } + return rmContainers.contains(rmContainer); + } + + /** + * Return should we preempt rmContainer. If we should, deduct from + * <code>resourceToObtainByPartition</code> + */ + private boolean tryPreemptContainerAndDeductResToObtain( + Map<String, Resource> resourceToObtainByPartitions, + RMContainer rmContainer, Resource clusterResource, + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, + Resource totalPreemptionAllowed) { + ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId(); + + // We will not account resource of a container twice or more + if (preemptMapContains(preemptMap, attemptId, rmContainer)) { + return false; + } + + String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode()); + Resource toObtainByPartition = + resourceToObtainByPartitions.get(nodePartition); + + if (null != toObtainByPartition && Resources.greaterThan(rc, + clusterResource, toObtainByPartition, Resources.none()) && Resources + .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(), + totalPreemptionAllowed)) { + Resources.subtractFrom(toObtainByPartition, + rmContainer.getAllocatedResource()); + Resources.subtractFrom(totalPreemptionAllowed, + rmContainer.getAllocatedResource()); + + // When we have no more resource need to obtain, remove from map. + if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition, + Resources.none())) { + resourceToObtainByPartitions.remove(nodePartition); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Marked container=" + rmContainer.getContainerId() + + " in partition=" + nodePartition + + " to be preemption candidates"); + } + // Add to preemptMap + addToPreemptMap(preemptMap, attemptId, rmContainer); + return true; + } + + return false; + } + + private String getPartitionByNodeId(NodeId nodeId) { + return preemptionContext.getScheduler().getSchedulerNode(nodeId) + .getPartition(); + } + + /** + * Given a target preemption for a specific application, select containers + * to preempt (after unreserving all reservation for that app). + */ + @SuppressWarnings("unchecked") + private void preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Map<String, Resource> resToObtainByPartition, + List<RMContainer> skippedAMContainerlist, Resource skippedAMSize, + Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers, + Resource totalPreemptionAllowed) { + ApplicationAttemptId appId = app.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Looking at application=" + app.getApplicationAttemptId() + + " resourceToObtain=" + resToObtainByPartition); + } + + // first drop reserved containers towards rsrcPreempt + List<RMContainer> reservedContainers = + new ArrayList<>(app.getReservedContainers()); + for (RMContainer c : reservedContainers) { + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedContainers)) { + continue; + } + if (resToObtainByPartition.isEmpty()) { + return; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, selectedContainers, totalPreemptionAllowed); + + if (!preemptionContext.isObserveOnly()) { + preemptionContext.getRMContext().getDispatcher().getEventHandler() + .handle(new ContainerPreemptEvent(appId, c, + SchedulerEventType.KILL_RESERVED_CONTAINER)); + } + } + + // if more resources are to be freed go through all live containers in + // reverse priority and reverse allocation order and mark them for + // preemption + List<RMContainer> liveContainers = + new ArrayList<>(app.getLiveContainers()); + + sortContainers(liveContainers); + + for (RMContainer c : liveContainers) { + if (resToObtainByPartition.isEmpty()) { + return; + } + + if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c, + selectedContainers)) { + continue; + } + + // Skip already marked to killable containers + if (null != preemptionContext.getKillableContainers() && preemptionContext + .getKillableContainers().contains(c.getContainerId())) { + continue; + } + + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getAllocatedResource()); + continue; + } + + // Try to preempt this container + tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c, + clusterResource, selectedContainers, totalPreemptionAllowed); + } + } + + /** + * Compare by reversed priority order first, and then reversed containerId + * order + * @param containers + */ + @VisibleForTesting + static void sortContainers(List<RMContainer> containers){ + Collections.sort(containers, new Comparator<RMContainer>() { + @Override + public int compare(RMContainer a, RMContainer b) { + Comparator<Priority> c = new org.apache.hadoop.yarn.server + .resourcemanager.resource.Priority.Comparator(); + int priorityComp = c.compare(b.getContainer().getPriority(), + a.getContainer().getPriority()); + if (priorityComp != 0) { + return priorityComp; + } + return b.getContainerId().compareTo(a.getContainerId()); + } + }); + } + + private void addToPreemptMap( + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, + ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) { + Set<RMContainer> set; + if (null == (set = preemptMap.get(appAttemptId))) { + set = new HashSet<>(); + preemptMap.put(appAttemptId, set); + } + set.add(containerToPreempt); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java new file mode 100644 index 0000000..2217210 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Calculate how much resources need to be preempted for each queue, + * will be used by {@link FifoCandidatesSelector} + */ +public class PreemptableResourceCalculator { + private static final Log LOG = + LogFactory.getLog(PreemptableResourceCalculator.class); + + private final CapacitySchedulerPreemptionContext context; + private final ResourceCalculator rc; + + static class TQComparator implements Comparator<TempQueuePerPartition> { + private ResourceCalculator rc; + private Resource clusterRes; + + TQComparator(ResourceCalculator rc, Resource clusterRes) { + this.rc = rc; + this.clusterRes = clusterRes; + } + + @Override + public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) { + if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) { + return -1; + } + if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) { + return 1; + } + return 0; + } + + // Calculates idealAssigned / guaranteed + // TempQueues with 0 guarantees are always considered the most over + // capacity and therefore considered last for resources. + private double getIdealPctOfGuaranteed(TempQueuePerPartition q) { + double pctOver = Integer.MAX_VALUE; + if (q != null && Resources.greaterThan( + rc, clusterRes, q.guaranteed, Resources.none())) { + pctOver = + Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed); + } + return (pctOver); + } + } + + public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) { + context = preemptionContext; + rc = preemptionContext.getResourceCalculator(); + } + + /** + * Computes a normalizedGuaranteed capacity based on active queues + * @param rc resource calculator + * @param clusterResource the total amount of resources in the cluster + * @param queues the list of queues to consider + */ + private void resetCapacity(ResourceCalculator rc, Resource clusterResource, + Collection<TempQueuePerPartition> queues, boolean ignoreGuar) { + Resource activeCap = Resource.newInstance(0, 0); + + if (ignoreGuar) { + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = 1.0f / queues.size(); + } + } else { + for (TempQueuePerPartition q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueuePerPartition q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } + } + } + + // Take the most underserved TempQueue (the one on the head). Collect and + // return the list of all queues that have the same idealAssigned + // percentage of guaranteed. + protected Collection<TempQueuePerPartition> getMostUnderservedQueues( + PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) { + ArrayList<TempQueuePerPartition> underserved = new ArrayList<>(); + while (!orderedByNeed.isEmpty()) { + TempQueuePerPartition q1 = orderedByNeed.remove(); + underserved.add(q1); + TempQueuePerPartition q2 = orderedByNeed.peek(); + // q1's pct of guaranteed won't be larger than q2's. If it's less, then + // return what has already been collected. Otherwise, q1's pct of + // guaranteed == that of q2, so add q2 to underserved list during the + // next pass. + if (q2 == null || tqComparator.compare(q1,q2) < 0) { + return underserved; + } + } + return underserved; + } + + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + */ + private void computeFixpointAllocation(ResourceCalculator rc, + Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc, + Resource unassigned, boolean ignoreGuarantee) { + // Prior to assigning the unused resources, process each queue as follows: + // If current > guaranteed, idealAssigned = guaranteed + untouchable extra + // Else idealAssigned = current; + // Subtract idealAssigned resources from unassigned. + // If the queue has all of its needs met (that is, if + // idealAssigned >= current + pending), remove it from consideration. + // Sort queues from most under-guaranteed to most over-guaranteed. + TQComparator tqComparator = new TQComparator(rc, tot_guarant); + PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10, + tqComparator); + for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) { + TempQueuePerPartition q = i.next(); + if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) { + q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra); + } else { + q.idealAssigned = Resources.clone(q.current); + } + Resources.subtractFrom(unassigned, q.idealAssigned); + // If idealAssigned < (current + pending), q needs more resources, so + // add it to the list of underserved queues, ordered by need. + Resource curPlusPend = Resources.add(q.current, q.pending); + if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) { + orderedByNeed.add(q); + } + } + + //assign all cluster resources until no more demand, or no resources are left + while (!orderedByNeed.isEmpty() + && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee); + + // For each underserved queue (or set of queues if multiple are equally + // underserved), offer its share of the unassigned resources based on its + // normalized guarantee. After the offer, if the queue is not satisfied, + // place it back in the ordered list of queues, recalculating its place + // in the order of most under-guaranteed to most over-guaranteed. In this + // way, the most underserved queue(s) are always given resources first. + Collection<TempQueuePerPartition> underserved = + getMostUnderservedQueues(orderedByNeed, tqComparator); + for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i + .hasNext();) { + TempQueuePerPartition sub = i.next(); + Resource wQavail = Resources.multiplyAndNormalizeUp(rc, + unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1)); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + + if (Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + // The queue is still asking for more. Put it back in the priority + // queue, recalculating its order based on need. + orderedByNeed.add(sub); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } + + /** + * This method computes (for a single level in the tree, passed as a {@code + * List<TempQueue>}) the ideal assignment of resources. This is done + * recursively to allocate capacity fairly across all queues with pending + * demands. It terminates when no resources are left to assign, or when all + * demand is satisfied. + * + * @param rc resource calculator + * @param queues a list of cloned queues to be assigned capacity to (this is + * an out param) + * @param totalPreemptionAllowed total amount of preemption we allow + * @param tot_guarant the amount of capacity assigned to this pool of queues + */ + private void computeIdealResourceDistribution(ResourceCalculator rc, + List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed, + Resource tot_guarant) { + + // qAlloc tracks currently active queues (will decrease progressively as + // demand is met) + List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues); + // unassigned tracks how much resources are still to assign, initialized + // with the total capacity for this set of queues + Resource unassigned = Resources.clone(tot_guarant); + + // group queues based on whether they have non-zero guaranteed capacity + Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>(); + Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>(); + + for (TempQueuePerPartition q : qAlloc) { + if (Resources + .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { + nonZeroGuarQueues.add(q); + } else { + zeroGuarQueues.add(q); + } + } + + // first compute the allocation as a fixpoint based on guaranteed capacity + computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + false); + + // if any capacity is left unassigned, distributed among zero-guarantee + // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) + if (!zeroGuarQueues.isEmpty() + && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { + computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + true); + } + + // based on ideal assignment computed above and current assignment we derive + // how much preemption is required overall + Resource totPreemptionNeeded = Resource.newInstance(0, 0); + for (TempQueuePerPartition t:queues) { + if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) { + Resources.addTo(totPreemptionNeeded, + Resources.subtract(t.current, t.idealAssigned)); + } + } + + // if we need to preempt more than is allowed, compute a factor (0<f<1) + // that is used to scale down how much we ask back from each queue + float scalingFactor = 1.0F; + if (Resources.greaterThan(rc, tot_guarant, + totPreemptionNeeded, totalPreemptionAllowed)) { + scalingFactor = Resources.divide(rc, tot_guarant, + totalPreemptionAllowed, totPreemptionNeeded); + } + + // assign to each queue the amount of actual preemption based on local + // information of ideal preemption and scaling factor + for (TempQueuePerPartition t : queues) { + t.assignPreemption(scalingFactor, rc, tot_guarant); + } + if (LOG.isDebugEnabled()) { + for (TempQueuePerPartition t : queues) { + LOG.debug(t); + } + } + + } + + /** + * This method recursively computes the ideal assignment of resources to each + * level of the hierarchy. This ensures that leafs that are over-capacity but + * with parents within capacity will not be preemptionCandidates. Preemptions are allowed + * within each subtree according to local over/under capacity. + * + * @param root the root of the cloned queue hierachy + * @param totalPreemptionAllowed maximum amount of preemption allowed + * @return a list of leaf queues updated with preemption targets + */ + private void recursivelyComputeIdealAssignment( + TempQueuePerPartition root, Resource totalPreemptionAllowed) { + if (root.getChildren() != null && + root.getChildren().size() > 0) { + // compute ideal distribution at this level + computeIdealResourceDistribution(rc, root.getChildren(), + totalPreemptionAllowed, root.idealAssigned); + // compute recursively for lower levels and build list of leafs + for(TempQueuePerPartition t : root.getChildren()) { + recursivelyComputeIdealAssignment(t, totalPreemptionAllowed); + } + } + } + + + private void calculateResToObtainByPartitionForLeafQueues( + Set<String> leafQueueNames, Resource clusterResource) { + // Loop all leaf queues + for (String queueName : leafQueueNames) { + // check if preemption disabled for the queue + if (context.getQueueByPartition(queueName, + RMNodeLabelsManager.NO_LABEL).preemptionDisabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + queueName + + " because it's a non-preemptable queue"); + } + continue; + } + + // compute resToObtainByPartition considered inter-queue preemption + for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) { + // we act only if we are violating balance by more than + // maxIgnoredOverCapacity + if (Resources.greaterThan(rc, clusterResource, qT.current, + Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) { + // we introduce a dampening factor naturalTerminationFactor that + // accounts for natural termination of containers + Resource resToObtain = Resources.multiply(qT.toBePreempted, + context.getNaturalTerminationFactor()); + // Only add resToObtain when it >= 0 + if (Resources.greaterThan(rc, clusterResource, resToObtain, + Resources.none())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Queue=" + queueName + " partition=" + qT.partition + + " resource-to-obtain=" + resToObtain); + } + } + qT.actuallyToBePreempted = Resources.clone(resToObtain); + } else { + qT.actuallyToBePreempted = Resources.none(); + } + } + } + } + + public void computeIdealAllocation(Resource clusterResource, + Resource totalPreemptionAllowed) { + for (String partition : context.getAllPartitions()) { + TempQueuePerPartition tRoot = + context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition); + // compute the ideal distribution of resources among queues + // updates cloned queues state accordingly + tRoot.idealAssigned = tRoot.guaranteed; + recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed); + } + + // based on ideal allocation select containers to be preempted from each + // calculate resource-to-obtain by partition for each leaf queues + calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(), + clusterResource); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/60e4116b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java new file mode 100644 index 0000000..dd33d8f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +import java.util.Map; +import java.util.Set; + +public abstract class PreemptionCandidatesSelector { + protected CapacitySchedulerPreemptionContext preemptionContext; + protected ResourceCalculator rc; + + PreemptionCandidatesSelector( + CapacitySchedulerPreemptionContext preemptionContext) { + this.preemptionContext = preemptionContext; + this.rc = preemptionContext.getResourceCalculator(); + } + + /** + * Get preemption candidates from computed resource sharing and already + * selected candidates. + * + * @param selectedCandidates already selected candidates from previous policies + * @param clusterResource + * @param totalPreemptedResourceAllowed how many resources allowed to be + * preempted in this round + * @return merged selected candidates. + */ + public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates( + Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates, + Resource clusterResource, Resource totalPreemptedResourceAllowed); +}