Author: cdouglas
Date: Thu May 9 23:29:38 2013
New Revision: 1480837
URL: http://svn.apache.org/r1480837
Log:
YARN-568. Add support for work preserving preemption to the FairScheduler.
Contributed by Carlo Curino and Sandy Ryza
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu May 9
23:29:38 2013
@@ -160,6 +160,9 @@ Release 2.0.5-beta - UNRELEASED
tokens for app attempt so that RM can be restarted while preserving current
applications. (Jian He via vinodkv)
+ YARN-568. Add support for work preserving preemption to the FairScheduler.
+ (Carlo Curino and Sandy Ryza via cdouglas)
+
YARN-598. Add virtual cores to queue metrics. (sandyr via tucu)
OPTIMIZATIONS
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
Thu May 9 23:29:38 2013
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,11 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -341,9 +348,65 @@ public class ApplicationMasterService ex
}
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
+
+ // add preemption to the allocateResponse message (if any)
+
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
+
return allocateResponse;
}
}
+
+ private PreemptionMessage generatePreemptionMessage(Allocation allocation){
+ PreemptionMessage pMsg = null;
+ // assemble strict preemption request
+ if (allocation.getStrictContainerPreemptions() != null) {
+ pMsg =
+ recordFactory.newRecordInstance(PreemptionMessage.class);
+ StrictPreemptionContract pStrict =
+ recordFactory.newRecordInstance(StrictPreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ pStrict.setContainers(pCont);
+ pMsg.setStrictContract(pStrict);
+ }
+
+ // assemble negotiable preemption request
+ if (allocation.getResourcePreemptions() != null &&
+ allocation.getResourcePreemptions().size() > 0 &&
+ allocation.getContainerPreemptions() != null &&
+ allocation.getContainerPreemptions().size() > 0) {
+ if (pMsg == null) {
+ pMsg =
+ recordFactory.newRecordInstance(PreemptionMessage.class);
+ }
+ PreemptionContract contract =
+ recordFactory.newRecordInstance(PreemptionContract.class);
+ Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
+ for (ContainerId cId : allocation.getContainerPreemptions()) {
+ PreemptionContainer pc =
+ recordFactory.newRecordInstance(PreemptionContainer.class);
+ pc.setId(cId);
+ pCont.add(pc);
+ }
+ List<PreemptionResourceRequest> pRes = new
ArrayList<PreemptionResourceRequest>();
+ for (ResourceRequest crr : allocation.getResourcePreemptions()) {
+ PreemptionResourceRequest prr =
+ recordFactory.newRecordInstance(PreemptionResourceRequest.class);
+ prr.setResourceRequest(crr);
+ pRes.add(prr);
+ }
+ contract.setContainers(pCont);
+ contract.setResourceRequest(pRes);
+ pMsg.setContract(contract);
+ }
+
+ return pMsg;
+ }
public void registerAppAttempt(ApplicationAttemptId attemptId) {
AllocateResponse response =
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
Thu May 9 23:29:38 2013
@@ -19,17 +19,43 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class Allocation {
+
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
final List<Container> containers;
final Resource resourceLimit;
-
+ final Set<ContainerId> strictContainers;
+ final Set<ContainerId> fungibleContainers;
+ final List<ResourceRequest> fungibleResources;
+
public Allocation(List<Container> containers, Resource resourceLimit) {
+ this(containers, resourceLimit, null, null, null);
+ }
+
+ public Allocation(List<Container> containers, Resource resourceLimit,
+ Set<ContainerId> strictContainers) {
+ this(containers, resourceLimit, strictContainers, null, null);
+ }
+
+ public Allocation(List<Container> containers, Resource resourceLimit,
+ Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+ List<ResourceRequest> fungibleResources) {
this.containers = containers;
this.resourceLimit = resourceLimit;
+ this.strictContainers = strictContainers;
+ this.fungibleContainers = fungibleContainers;
+ this.fungibleResources = fungibleResources;
}
public List<Container> getContainers() {
@@ -39,5 +65,17 @@ public class Allocation {
public Resource getResourceLimit() {
return resourceLimit;
}
-
+
+ public Set<ContainerId> getStrictContainerPreemptions() {
+ return strictContainers;
+ }
+
+ public Set<ContainerId> getContainerPreemptions() {
+ return fungibleContainers;
+ }
+
+ public List<ResourceRequest> getResourcePreemptions() {
+ return fungibleResources;
+ }
+
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Thu May 9 23:29:38 2013
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -86,7 +87,9 @@ public class FSSchedulerApp extends Sche
final Map<Priority, Map<NodeId, RMContainer>> reservedContainers =
new HashMap<Priority, Map<NodeId, RMContainer>>();
-
+
+ final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer,
Long>();
+
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@@ -233,6 +236,9 @@ public class FSSchedulerApp extends Sche
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
+
+ // remove from preemption map if it is completed
+ preemptionMap.remove(rmContainer);
}
synchronized public List<Container> pullNewlyAllocatedContainers() {
@@ -574,4 +580,18 @@ public class FSSchedulerApp extends Sche
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
+
+ // related methods
+ public void addPreemption(RMContainer container, long time) {
+ assert preemptionMap.get(container) == null;
+ preemptionMap.put(container, time);
+ }
+
+ public Long getContainerPreemptionTime(RMContainer container) {
+ return preemptionMap.get(container);
+ }
+
+ public Set<RMContainer> getPreemptionContainers() {
+ return preemptionMap.keySet();
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu May 9 23:29:38 2013
@@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@@ -155,10 +158,16 @@ public class FairScheduler implements Re
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
- // How often tasks are preempted (must be longer than a couple
+ // How often tasks are preempted
+ protected long preemptionInterval;
+
+ // ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
- protected long preemptionInterval = 15000;
-
+ protected long waitTimeBeforeKill;
+
+ // Containers whose AMs have been warned that they will be preempted soon.
+ private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
+
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight
adjuster
@@ -331,34 +340,78 @@ public class FairScheduler implements Re
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
- return c2.getContainer().getPriority().compareTo(
+ int ret = c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority());
+ if (ret == 0) {
+ return c2.getContainerId().compareTo(c1.getContainerId());
+ }
+ return ret;
}
});
+
+ // Scan down the list of containers we've already warned and kill them
+ // if we need to. Remove any containers from the list that we don't need
+ // or that are no longer running.
+ Iterator<RMContainer> warnedIter = warnedContainers.iterator();
+ Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
+ while (warnedIter.hasNext()) {
+ RMContainer container = warnedIter.next();
+ if (container.getState() == RMContainerState.RUNNING &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ warnOrKillContainer(container, apps.get(container),
queues.get(container));
+ preemptedThisRound.add(container);
+ Resources.subtractFrom(toPreempt,
container.getContainer().getResource());
+ } else {
+ warnedIter.remove();
+ }
+ }
- // Scan down the sorted list of task statuses until we've killed enough
- // tasks, making sure we don't kill too many from any queue
- for (RMContainer container : runningContainers) {
+ // Scan down the rest of the containers until we've preempted enough,
making
+ // sure we don't preempt too many from any queue
+ Iterator<RMContainer> runningIter = runningContainers.iterator();
+ while (runningIter.hasNext() &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ toPreempt, Resources.none())) {
+ RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
- if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
- sched.getResourceUsage(), sched.getFairShare())) {
- LOG.info("Preempting container (prio=" +
container.getContainer().getPriority() +
- "res=" + container.getContainer().getResource() +
- ") from queue " + sched.getName());
- ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
+ if (!preemptedThisRound.contains(container) &&
+ Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
+ sched.getResourceUsage(), sched.getFairShare())) {
+ warnOrKillContainer(container, apps.get(container), sched);
+
+ warnedContainers.add(container);
+ Resources.subtractFrom(toPreempt,
container.getContainer().getResource());
+ }
+ }
+ }
+
+ private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
+ FSLeafQueue queue) {
+ LOG.info("Preempting container (prio=" +
container.getContainer().getPriority() +
+ "res=" + container.getContainer().getResource() +
+ ") from queue " + queue.getName());
+
+ Long time = app.getContainerPreemptionTime(container);
+
+ if (time != null) {
+ // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
+ // proceed with kill
+ if (time + waitTimeBeforeKill < clock.getTime()) {
+ ContainerStatus status =
+ SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of
cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
-
- toPreempt = Resources.subtract(toPreempt,
- container.getContainer().getResource());
- if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, clusterCapacity,
- toPreempt, Resources.none())) {
- break;
- }
+ LOG.info("Killing container" + container +
+ " (after waiting for premption for " +
+ (clock.getTime() - time) + "ms)");
}
+ } else {
+ // track the request in the FSSchedulerApp itself
+ app.addPreemption(container, clock.getTime());
}
}
@@ -483,11 +536,11 @@ public class FairScheduler implements Re
return clusterCapacity;
}
- public Clock getClock() {
+ public synchronized Clock getClock() {
return clock;
}
- protected void setClock(Clock clock) {
+ protected synchronized void setClock(Clock clock) {
this.clock = clock;
}
@@ -745,10 +798,18 @@ public class FairScheduler implements Re
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
- }
+ LOG.debug("Preempting " + application.getPreemptionContainers().size()
+ + " container(s)");
+ }
+
+ Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
+ for (RMContainer container : application.getPreemptionContainers()) {
+ preemptionContainerIds.add(container.getContainerId());
+ }
+
return new Allocation(application.pullNewlyAllocatedContainers(),
- application.getHeadroom());
+ application.getHeadroom(), preemptionContainerIds);
}
}
@@ -963,7 +1024,9 @@ public class FairScheduler implements Re
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
-
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+
if (!initialized) {
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
Thu May 9 23:29:38 2013
@@ -55,6 +55,11 @@ public class FairSchedulerConfiguration
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
+
+ protected static final String PREEMPTION_INTERVAL = CONF_PREFIX +
"preemptionInterval";
+ protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+ protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX +
"waitTimeBeforeKill";
+ protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX +
"assignmultiple";
@@ -123,4 +128,12 @@ public class FairSchedulerConfiguration
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
+
+ public int getPreemptionInterval() {
+ return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+ }
+
+ public int getWaitTimeBeforeKill() {
+ return getInt(WAIT_TIME_BEFORE_KILL, DEFAULT_WAIT_TIME_BEFORE_KILL);
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1480837&r1=1480836&r2=1480837&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu May 9 23:29:38 2013
@@ -30,6 +30,7 @@ import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -900,9 +901,16 @@ public class TestFairScheduler {
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = createConfiguration();
+
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file",
ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
-
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
@@ -997,15 +1005,38 @@ public class TestFairScheduler {
Resources.createResource(2 * 1024));
assertEquals(1,
scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1,
scheduler.applications.get(app2).getLiveContainers().size());
- assertEquals(0,
scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1,
scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1,
scheduler.applications.get(app5).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the
application
+
assertTrue(!Collections.disjoint(scheduler.applications.get(app3).getLiveContainers(),
+
scheduler.applications.get(app3).getPreemptionContainers()));
+
assertTrue(!Collections.disjoint(scheduler.applications.get(app6).getLiveContainers(),
+
scheduler.applications.get(app6).getPreemptionContainers()));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // At this point the containers should have been killed (since we are not
simulating AM)
assertEquals(0,
scheduler.applications.get(app6).getLiveContainers().size());
+ assertEquals(0,
scheduler.applications.get(app3).getLiveContainers().size());
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
+ Resources.createResource(2 * 1024));
+
+ // Pretend 15 seconds have passed
+ clock.tick(15);
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
+
assertEquals(1,
scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0,
scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0,
scheduler.applications.get(app3).getLiveContainers().size());