Author: sandy
Date: Thu May 29 04:01:24 2014
New Revision: 1598197
URL: http://svn.apache.org/r1598197
Log:
YARN-596. Use scheduling policies throughout the queue hierarchy to decide
which containers to preempt (Wei Yan via Sandy Ryza)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May 29 04:01:24 2014
@@ -114,6 +114,9 @@ Release 2.5.0 - UNRELEASED
YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package.
(Vinod
Kumar Vavilapalli via zjshen)
+ YARN-596. Use scheduling policies throughout the queue hierarchy to decide
+ which containers to preempt (Wei Yan via Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
Thu May 29 04:01:24 2014
@@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,8 +33,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -58,6 +58,8 @@ public class AppSchedulable extends Sche
private Priority priority;
private ResourceWeights resourceWeights;
+ private RMContainerComparator comparator = new RMContainerComparator();
+
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app,
FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
@@ -111,7 +113,10 @@ public class AppSchedulable extends Sche
@Override
public Resource getResourceUsage() {
- return app.getCurrentConsumption();
+ // Here the getPreemptedResources() always return zero, except in
+ // a preemption round
+ return Resources.subtract(app.getCurrentConsumption(),
+ app.getPreemptedResources());
}
@@ -384,6 +389,27 @@ public class AppSchedulable extends Sche
}
/**
+ * Preempt a running container according to the priority
+ */
+ @Override
+ public RMContainer preemptContainer() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("App " + getName() + " is going to preempt a running " +
+ "container");
+ }
+
+ RMContainer toBePreempted = null;
+ for (RMContainer container : app.getLiveContainers()) {
+ if (! app.getPreemptionContainers().contains(container) &&
+ (toBePreempted == null ||
+ comparator.compare(toBePreempted, container) > 0)) {
+ toBePreempted = container;
+ }
+ }
+ return toBePreempted;
+ }
+
+ /**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
*/
@@ -407,4 +433,17 @@ public class AppSchedulable extends Sche
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
+
+ static class RMContainerComparator implements Comparator<RMContainer>,
+ Serializable {
+ @Override
+ public int compare(RMContainer c1, RMContainer c2) {
+ int ret = c1.getContainer().getPriority().compareTo(
+ c2.getContainer().getPriority());
+ if (ret == 0) {
+ return c2.getContainerId().compareTo(c1.getContainerId());
+ }
+ return ret;
+ }
+ }
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
Thu May 29 04:01:24 2014
@@ -33,10 +33,10 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private
@Unstable
@@ -209,6 +209,36 @@ public class FSLeafQueue extends FSQueue
}
@Override
+ public RMContainer preemptContainer() {
+ RMContainer toBePreempted = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue " + getName() + " is going to preempt a container " +
+ "from its applications.");
+ }
+
+ // If this queue is not over its fair share, reject
+ if (!preemptContainerPreCheck()) {
+ return toBePreempted;
+ }
+
+ // Choose the app that is most over fair share
+ Comparator<Schedulable> comparator = policy.getComparator();
+ AppSchedulable candidateSched = null;
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (candidateSched == null ||
+ comparator.compare(sched, candidateSched) > 0) {
+ candidateSched = sched;
+ }
+ }
+
+ // Preempt from the selected app
+ if (candidateSched != null) {
+ toBePreempted = candidateSched.preemptContainer();
+ }
+ return toBePreempted;
+ }
+
+ @Override
public List<FSQueue> getChildQueues() {
return new ArrayList<FSQueue>(1);
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
Thu May 29 04:01:24 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -157,6 +159,32 @@ public class FSParentQueue extends FSQue
}
@Override
+ public RMContainer preemptContainer() {
+ RMContainer toBePreempted = null;
+
+ // If this queue is not over its fair share, reject
+ if (!preemptContainerPreCheck()) {
+ return toBePreempted;
+ }
+
+ // Find the childQueue which is most over fair share
+ FSQueue candidateQueue = null;
+ Comparator<Schedulable> comparator = policy.getComparator();
+ for (FSQueue queue : childQueues) {
+ if (candidateQueue == null ||
+ comparator.compare(queue, candidateQueue) > 0) {
+ candidateQueue = queue;
+ }
+ }
+
+ // Let the selected queue choose which of its container to preempt
+ if (candidateQueue != null) {
+ toBePreempted = candidateQueue.preemptContainer();
+ }
+ return toBePreempted;
+ }
+
+ @Override
public List<FSQueue> getChildQueues() {
return childQueues;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
Thu May 29 04:01:24 2014
@@ -187,4 +187,17 @@ public abstract class FSQueue extends Sc
}
return true;
}
+
+ /**
+ * Helper method to check if the queue should preempt containers
+ *
+ * @return true if check passes (can preempt) or false otherwise
+ */
+ protected boolean preemptContainerPreCheck() {
+ if (this == scheduler.getQueueManager().getRootQueue()) {
+ return true;
+ }
+ return parent.getPolicy()
+ .checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
+ }
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Thu May 29 04:01:24 2014
@@ -59,6 +59,8 @@ public class FSSchedulerApp extends Sche
private AppSchedulable appSchedulable;
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer,
Long>();
+
+ private Resource preemptedResources = Resources.createResource(0);
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
@@ -316,6 +318,7 @@ public class FSSchedulerApp extends Sche
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
+ Resources.addTo(preemptedResources, container.getAllocatedResource());
}
public Long getContainerPreemptionTime(RMContainer container) {
@@ -330,4 +333,20 @@ public class FSSchedulerApp extends Sche
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
+
+ public Resource getPreemptedResources() {
+ return preemptedResources;
+ }
+
+ public void resetPreemptedResources() {
+ preemptedResources = Resources.createResource(0);
+ for (RMContainer container : getPreemptionContainers()) {
+ Resources.addTo(preemptedResources, container.getAllocatedResource());
+ }
+ }
+
+ public void clearPreemptedResources() {
+ preemptedResources.setMemory(0);
+ preemptedResources.setVirtualCores(0);
+ }
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu May 29 04:01:24 2014
@@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -337,94 +334,78 @@ public class FairScheduler extends
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt,
Resources.none())) {
- preemptResources(queueMgr.getLeafQueues(), resToPreempt);
+ preemptResources(resToPreempt);
}
}
/**
- * Preempt a quantity of resources from a list of QueueSchedulables. The
- * policy for this is to pick apps from queues that are over their fair
share,
- * but make sure that no queue is placed below its fair share in the process.
- * We further prioritize preemption by choosing containers with lowest
- * priority to preempt.
+ * Preempt a quantity of resources. Each round, we start from the root queue,
+ * level-by-level, until choosing a candidate application.
+ * The policy for prioritizing preemption for each queue depends on its
+ * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
+ * most over its fair share; (2) FIFO, choose the childSchedulable that is
+ * latest launched.
+ * Inside each application, we further prioritize preemption by choosing
+ * containers with lowest priority to preempt.
+ * We make sure that no queue is placed below its fair share in the process.
*/
- protected void preemptResources(Collection<FSLeafQueue> scheds,
- Resource toPreempt) {
- if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
+ protected void preemptResources(Resource toPreempt) {
+ if (Resources.equals(toPreempt, Resources.none())) {
return;
}
- Map<RMContainer, FSSchedulerApp> apps =
- new HashMap<RMContainer, FSSchedulerApp>();
- Map<RMContainer, FSLeafQueue> queues =
- new HashMap<RMContainer, FSLeafQueue>();
-
- // Collect running containers from over-scheduled queues
- List<RMContainer> runningContainers = new ArrayList<RMContainer>();
- for (FSLeafQueue sched : scheds) {
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
- sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
- for (RMContainer c : as.getApp().getLiveContainers()) {
- runningContainers.add(c);
- apps.put(c, as.getApp());
- queues.put(c, sched);
- }
- }
- }
- }
-
- // Sort containers into reverse order of priority
- Collections.sort(runningContainers, new Comparator<RMContainer>() {
- public int compare(RMContainer c1, RMContainer c2) {
- int ret = c1.getContainer().getPriority().compareTo(
- c2.getContainer().getPriority());
- if (ret == 0) {
- return c2.getContainerId().compareTo(c1.getContainerId());
- }
- return ret;
- }
- });
-
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
- Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
- if (container.getState() == RMContainerState.RUNNING &&
+ if ((container.getState() == RMContainerState.RUNNING ||
+ container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
- warnOrKillContainer(container, apps.get(container),
queues.get(container));
- preemptedThisRound.add(container);
+ warnOrKillContainer(container);
Resources.subtractFrom(toPreempt,
container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
- // Scan down the rest of the containers until we've preempted enough,
making
- // sure we don't preempt too many from any queue
- Iterator<RMContainer> runningIter = runningContainers.iterator();
- while (runningIter.hasNext() &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
- toPreempt, Resources.none())) {
- RMContainer container = runningIter.next();
- FSLeafQueue sched = queues.get(container);
- if (!preemptedThisRound.contains(container) &&
- Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
- sched.getResourceUsage(), sched.getFairShare())) {
- warnOrKillContainer(container, apps.get(container), sched);
-
- warnedContainers.add(container);
- Resources.subtractFrom(toPreempt,
container.getContainer().getResource());
+ try {
+ // Reset preemptedResource for each app
+ for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+ for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
+ app.getApp().resetPreemptedResources();
+ }
+ }
+
+ while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
+ toPreempt, Resources.none())) {
+ RMContainer container =
+ getQueueManager().getRootQueue().preemptContainer();
+ if (container == null) {
+ break;
+ } else {
+ warnOrKillContainer(container);
+ warnedContainers.add(container);
+ Resources.subtractFrom(
+ toPreempt, container.getContainer().getResource());
+ }
+ }
+ } finally {
+ // Clear preemptedResources for each app
+ for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
+ for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
+ app.getApp().clearPreemptedResources();
+ }
}
}
}
- private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
- FSLeafQueue queue) {
+ private void warnOrKillContainer(RMContainer container) {
+ ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
+ FSSchedulerApp app = getSchedulerApp(appAttemptId);
+ FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" +
container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
Thu May 29 04:01:24 2014
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -100,6 +101,11 @@ public abstract class Schedulable {
*/
public abstract Resource assignContainer(FSSchedulerNode node);
+ /**
+ * Preempt a container from this Schedulable if possible.
+ */
+ public abstract RMContainer preemptContainer();
+
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
Thu May 29 04:01:24 2014
@@ -139,4 +139,14 @@ public abstract class SchedulingPolicy {
*/
public abstract void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources);
+
+ /**
+ * Check if the resource usage is over the fair share under this policy
+ *
+ * @param usage {@link Resource} the resource usage
+ * @param fairShare {@link Resource} the fair share
+ * @return true if check passes (is over) or false otherwise
+ */
+ public abstract boolean checkIfUsageOverFairShare(
+ Resource usage, Resource fairShare);
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
Thu May 29 04:01:24 2014
@@ -70,6 +70,11 @@ public class DominantResourceFairnessPol
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare)
{
+ return !Resources.fitsIn(usage, fairShare);
+ }
+
+ @Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
Thu May 29 04:01:24 2014
@@ -120,6 +120,11 @@ public class FairSharePolicy extends Sch
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare)
{
+ return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
Thu May 29 04:01:24 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -89,6 +88,13 @@ public class FifoPolicy extends Scheduli
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare)
{
+ throw new UnsupportedOperationException(
+ "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " +
+ "as FifoPolicy only works for FSLeafQueue.");
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
Thu May 29 04:01:24 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -84,6 +85,11 @@ public class FakeSchedulable extends Sch
}
@Override
+ public RMContainer preemptContainer() {
+ return null;
+ }
+
+ @Override
public Resource getDemand() {
return null;
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu May 29 04:01:24 2014
@@ -1029,13 +1029,13 @@ public class TestFairScheduler extends F
@Test (timeout = 5000)
/**
- * Make sure containers are chosen to be preempted in the correct order.
Right
- * now this means decreasing order of priority.
+ * Make sure containers are chosen to be preempted in the correct order.
*/
public void testChoiceOfPreemptedContainers() throws Exception {
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file",
ALLOC_FILE);
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock();
scheduler.setClock(clock);
@@ -1052,7 +1052,7 @@ public class TestFairScheduler extends F
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
- out.println("<queue name=\"queueD\">");
+ out.println("<queue name=\"default\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("</allocations>");
@@ -1060,133 +1060,132 @@ public class TestFairScheduler extends F
scheduler.reinitialize(conf, resourceManager.getRMContext());
- // Create four nodes
+ // Create two nodes
RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
-
- // Queue A and B each request three containers
+ // Queue A and B each request two applications
ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
scheduler.update();
+ scheduler.getQueueManager().getLeafQueue("queueA", true)
+ .setPolicy(SchedulingPolicy.parse("fifo"));
+ scheduler.getQueueManager().getLeafQueue("queueB", true)
+ .setPolicy(SchedulingPolicy.parse("fair"));
+
// Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new
NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ for (int i = 0; i < 4; i++) {
scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new
NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new
NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
}
- assertEquals(1,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app6).getLiveContainers().size());
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
- scheduler.update();
-
- // We should be able to claw back one container from A and B each.
- // Make sure it is lowest priority container.
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
- assertEquals(1,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app5).getLiveContainers().size());
-
- // First verify we are adding containers to preemption list for the
application
-
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
-
scheduler.getSchedulerApp(app3).getPreemptionContainers()));
-
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
-
scheduler.getSchedulerApp(app6).getPreemptionContainers()));
+ assertEquals(2,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(2,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(2,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+ // Now new requests arrive from queueC and default
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ scheduler.update();
+
+ // We should be able to claw back one container from queueA and queueB
each.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertEquals(2,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the app.
+ // For queueA (fifo), app2 is selected.
+ // For queueB (fair), app4 is selected.
+ assertTrue("App2 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App4 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
// Trigger a kill by insisting we want containers back
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not
simulating AM)
- assertEquals(0,
scheduler.getSchedulerApp(app6).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(1,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(1,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ // Inside each app, containers are sorted according to their priorities.
+ // Containers with priority 4 are preempted for app2 and app4.
+ Set<RMContainer> set = new HashSet<RMContainer>();
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app2).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app4).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ assertTrue("Containers with priority=4 in app2 and app4 should be " +
+ "preempted.", set.isEmpty());
// Trigger a kill by insisting we want containers back
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed
clock.tick(15);
// We should be able to claw back another container from A and B each.
- // Make sure it is lowest priority container.
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
-
- assertEquals(1,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ // For queueA (fifo), continue preempting from app2.
+ // For queueB (fair), even app4 has a lowest priority container with p=4,
it
+ // still preempts from app3 as app3 is most over fair share.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+ assertEquals(2,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(1,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
- scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
- Resources.createResource(2 * 1024));
- assertEquals(1,
scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1,
scheduler.getSchedulerApp(app4).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app5).getLiveContainers().size());
- assertEquals(0,
scheduler.getSchedulerApp(app6).getLiveContainers().size());
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertTrue("App1 should have no container to be preempted",
+ scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+ assertTrue("App2 should have no container to be preempted",
+ scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+ assertTrue("App3 should have no container to be preempted",
+ scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+ assertTrue("App4 should have no container to be preempted",
+ scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
}
@Test (timeout = 5000)
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java?rev=1598197&r1=1598196&r2=1598197&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
Thu May 29 04:01:24 2014
@@ -35,10 +35,8 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.Collection;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@@ -51,8 +49,7 @@ public class TestFairSchedulerPreemption
public int lastPreemptMemory = -1;
@Override
- protected void preemptResources(
- Collection<FSLeafQueue> scheds, Resource toPreempt) {
+ protected void preemptResources(Resource toPreempt) {
lastPreemptMemory = toPreempt.getMemory();
}