Author: sandy
Date: Tue Oct 1 19:54:50 2013
New Revision: 1528192
URL: http://svn.apache.org/r1528192
Log:
YARN-1010. FairScheduler: decouple container scheduling from nodemanager
heartbeats. (Wei Yan via Sandy Ryza)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Oct 1 19:54:50 2013
@@ -26,6 +26,9 @@ Release 2.3.0 - UNRELEASED
YARN-1021. Yarn Scheduler Load Simulator. (ywskycn via tucu)
+ YARN-1010. FairScheduler: decouple container scheduling from nodemanager
+ heartbeats. (Wei Yan via Sandy Ryza)
+
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
Tue Oct 1 19:54:50 2013
@@ -310,10 +310,19 @@ public class AppSchedulable extends Sche
+ localRequest);
}
- NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
- scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
- scheduler.getRackLocalityThreshold());
-
+ NodeType allowedLocality;
+ if (scheduler.isContinuousSchedulingEnabled()) {
+ allowedLocality = app.getAllowedLocalityLevelByTime(priority,
+ scheduler.getNodeLocalityDelayMs(),
+ scheduler.getRackLocalityDelayMs(),
+ scheduler.getClock().getTime());
+ } else {
+ allowedLocality = app.getAllowedLocalityLevel(priority,
+ scheduler.getNumClusterNodes(),
+ scheduler.getNodeLocalityThreshold(),
+ scheduler.getRackLocalityThreshold());
+ }
+
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, priority,
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Tue Oct 1 19:54:50 2013
@@ -464,7 +464,12 @@ public class FSSchedulerApp extends Sche
* @param priority The priority of the container scheduled.
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
- lastScheduledContainer.put(priority, System.currentTimeMillis());
+ resetSchedulingOpportunities(priority, System.currentTimeMillis());
+ }
+ // used for continuous scheduling
+ synchronized public void resetSchedulingOpportunities(Priority priority,
+ long currentTimeMs) {
+ lastScheduledContainer.put(priority, currentTimeMs);
schedulingOpportunities.setCount(priority, 0);
}
@@ -513,6 +518,55 @@ public class FSSchedulerApp extends Sche
return allowedLocalityLevel.get(priority);
}
+ /**
+ * Return the level at which we are allowed to schedule containers.
+ * Given the thresholds indicating how much time passed before relaxing
+ * scheduling constraints.
+ */
+ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
+ long nodeLocalityDelayMs, long rackLocalityDelayMs,
+ long currentTimeMs) {
+
+ // if not being used, can schedule anywhere
+ if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
+ return NodeType.OFF_SWITCH;
+ }
+
+ // default level is NODE_LOCAL
+ if (! allowedLocalityLevel.containsKey(priority)) {
+ allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
+ return NodeType.NODE_LOCAL;
+ }
+
+ NodeType allowed = allowedLocalityLevel.get(priority);
+
+ // if level is already most liberal, we're done
+ if (allowed.equals(NodeType.OFF_SWITCH)) {
+ return NodeType.OFF_SWITCH;
+ }
+
+ // check waiting time
+ long waitTime = currentTimeMs;
+ if (lastScheduledContainer.containsKey(priority)) {
+ waitTime -= lastScheduledContainer.get(priority);
+ } else {
+ waitTime -= appSchedulable.getStartTime();
+ }
+
+ long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
+ nodeLocalityDelayMs : rackLocalityDelayMs;
+
+ if (waitTime > thresholdTime) {
+ if (allowed.equals(NodeType.NODE_LOCAL)) {
+ allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
+ resetSchedulingOpportunities(priority, currentTimeMs);
+ } else if (allowed.equals(NodeType.RACK_LOCAL)) {
+ allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
+ resetSchedulingOpportunities(priority, currentTimeMs);
+ }
+ }
+ return allowedLocalityLevel.get(priority);
+ }
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Tue Oct 1 19:54:50 2013
@@ -179,8 +179,12 @@ public class FairScheduler implements Re
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight
adjuster
+ protected boolean continuousSchedulingEnabled; // Continuous Scheduling
enabled or not
+ protected int continuousSchedulingSleepMs; // Sleep time for each pass in
continuous scheduling
protected double nodeLocalityThreshold; // Cluster threshold for node
locality
protected double rackLocalityThreshold; // Cluster threshold for rack
locality
+ protected long nodeLocalityDelayMs; // Delay for node locality
+ protected long rackLocalityDelayMs; // Delay for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat
@@ -582,6 +586,22 @@ public class FairScheduler implements Re
return rackLocalityThreshold;
}
+ public long getNodeLocalityDelayMs() {
+ return nodeLocalityDelayMs;
+ }
+
+ public long getRackLocalityDelayMs() {
+ return rackLocalityDelayMs;
+ }
+
+ public boolean isContinuousSchedulingEnabled() {
+ return continuousSchedulingEnabled;
+ }
+
+ public synchronized int getContinuousSchedulingSleepMs() {
+ return continuousSchedulingSleepMs;
+ }
+
public Resource getClusterCapacity() {
return clusterCapacity;
}
@@ -907,6 +927,37 @@ public class FairScheduler implements Re
completedContainer, RMContainerEventType.FINISHED);
}
+ if (continuousSchedulingEnabled) {
+ if (!completedContainers.isEmpty()) {
+ attemptScheduling(node);
+ }
+ } else {
+ attemptScheduling(node);
+ }
+ }
+
+ private void continuousScheduling() {
+ while (true) {
+ for (FSSchedulerNode node : nodes.values()) {
+ try {
+ if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
+ attemptScheduling(node);
+ }
+ } catch (Throwable ex) {
+ LOG.warn("Error while attempting scheduling for node " + node + ": "
+
+ ex.toString(), ex);
+ }
+ }
+ try {
+ Thread.sleep(getContinuousSchedulingSleepMs());
+ } catch (InterruptedException e) {
+ LOG.warn("Error while doing sleep in continuous scheduling: " +
+ e.toString(), e);
+ }
+ }
+ }
+
+ private synchronized void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
@@ -914,19 +965,18 @@ public class FairScheduler implements Re
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority =
node.getReservedContainer().getReservedPriority();
- if (reservedAppSchedulable != null &&
- !reservedAppSchedulable.hasContainerForNode(reservedPriority, node))
{
+ if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node))
{
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for
application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
- + " on node " + nm);
+ + " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
- + " on node: " + nm);
+ + " on node: " + node);
node.getReservedAppSchedulable().assignReservedContainer(node);
}
@@ -1060,8 +1110,13 @@ public class FairScheduler implements Re
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+ continuousSchedulingSleepMs =
+ this.conf.getContinuousSchedulingSleepMs();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
@@ -1088,6 +1143,21 @@ public class FairScheduler implements Re
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
updateThread.start();
+
+ if (continuousSchedulingEnabled) {
+ // start continuous scheduling thread
+ Thread schedulingThread = new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ continuousScheduling();
+ }
+ }
+ );
+ schedulingThread.setName("ContinuousScheduling");
+ schedulingThread.setDaemon(true);
+ schedulingThread.start();
+ }
} else {
try {
queueMgr.reloadAllocs();
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
Tue Oct 1 19:54:50 2013
@@ -66,6 +66,22 @@ public class FairSchedulerConfiguration
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
DEFAULT_LOCALITY_THRESHOLD;
+ /** Delay for node locality. */
+ protected static final String LOCALITY_DELAY_NODE_MS = CONF_PREFIX +
"locality-delay-node-ms";
+ protected static final long DEFAULT_LOCALITY_DELAY_NODE_MS = -1L;
+
+ /** Delay for rack locality. */
+ protected static final String LOCALITY_DELAY_RACK_MS = CONF_PREFIX +
"locality-delay-rack-ms";
+ protected static final long DEFAULT_LOCALITY_DELAY_RACK_MS = -1L;
+
+ /** Enable continuous scheduling or not. */
+ protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
"continuous-scheduling-enabled";
+ protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
+
+ /** Sleep time of each pass in continuous scheduling (5ms in default) */
+ protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
"continuous-scheduling-sleep-ms";
+ protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
+
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
@@ -134,6 +150,22 @@ public class FairSchedulerConfiguration
return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
}
+ public boolean isContinuousSchedulingEnabled() {
+ return getBoolean(CONTINUOUS_SCHEDULING_ENABLED,
DEFAULT_CONTINUOUS_SCHEDULING_ENABLED);
+ }
+
+ public int getContinuousSchedulingSleepMs() {
+ return getInt(CONTINUOUS_SCHEDULING_SLEEP_MS,
DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
+ }
+
+ public long getLocalityDelayNodeMs() {
+ return getLong(LOCALITY_DELAY_NODE_MS, DEFAULT_LOCALITY_DELAY_NODE_MS);
+ }
+
+ public long getLocalityDelayRackMs() {
+ return getLong(LOCALITY_DELAY_RACK_MS, DEFAULT_LOCALITY_DELAY_RACK_MS);
+ }
+
public boolean getPreemptionEnabled() {
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
Tue Oct 1 19:54:50 2013
@@ -25,11 +25,25 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
+ private class MockClock implements Clock {
+ private long time = 0;
+ @Override
+ public long getTime() {
+ return time;
+ }
+
+ public void tick(int seconds) {
+ time = time + seconds * 1000;
+ }
+
+ }
+
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
@@ -94,6 +108,63 @@ public class TestFSSchedulerApp {
}
@Test
+ public void testDelaySchedulingForContinuousScheduling()
+ throws InterruptedException {
+ Queue queue = Mockito.mock(Queue.class);
+ Priority prio = Mockito.mock(Priority.class);
+ Mockito.when(prio.getPriority()).thenReturn(1);
+
+ MockClock clock = new MockClock();
+
+ long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds
+ long rackLocalityDelayMs = 6 * 1000L; // 6 seconds
+
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, "user1", queue,
+ null, null);
+ AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class);
+ long startTime = clock.getTime();
+ Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime);
+ schedulerApp.setAppSchedulable(appSchedulable);
+
+ // Default level should be node-local
+ assertEquals(NodeType.NODE_LOCAL,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+
+ // after 4 seconds should remain node local
+ clock.tick(4);
+ assertEquals(NodeType.NODE_LOCAL,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+
+ // after 6 seconds should switch to rack local
+ clock.tick(2);
+ assertEquals(NodeType.RACK_LOCAL,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+
+ // manually set back to node local
+ schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
+ schedulerApp.resetSchedulingOpportunities(prio, clock.getTime());
+ assertEquals(NodeType.NODE_LOCAL,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+
+ // Now escalate again to rack-local, then to off-switch
+ clock.tick(6);
+ assertEquals(NodeType.RACK_LOCAL,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+
+ clock.tick(7);
+ assertEquals(NodeType.OFF_SWITCH,
+ schedulerApp.getAllowedLocalityLevelByTime(prio,
+ nodeLocalityDelayMs, rackLocalityDelayMs,
clock.getTime()));
+ }
+
+ @Test
/**
* Ensure that when negative paramaters are given (signaling delay scheduling
* no tin use), the least restrictive locality level is returned.
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1528192&r1=1528191&r2=1528192&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Tue Oct 1 19:54:50 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -298,6 +299,14 @@ public class TestFairScheduler {
conf.setBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT, true);
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5);
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7);
+ conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+ true);
+ conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS,
+ 10);
+ conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS,
+ 5000);
+ conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS,
+ 5000);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
@@ -308,6 +317,11 @@ public class TestFairScheduler {
Assert.assertEquals(true, scheduler.sizeBasedWeight);
Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01);
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
+ Assert.assertTrue("The continuous scheduling should be enabled",
+ scheduler.continuousSchedulingEnabled);
+ Assert.assertEquals(10, scheduler.continuousSchedulingSleepMs);
+ Assert.assertEquals(5000, scheduler.nodeLocalityDelayMs);
+ Assert.assertEquals(5000, scheduler.rackLocalityDelayMs);
Assert.assertEquals(1024,
scheduler.getMaximumResourceCapability().getMemory());
Assert.assertEquals(512,
scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(128,
@@ -2255,4 +2269,47 @@ public class TestFairScheduler {
fs.applications, FSSchedulerApp.class);
}
+ @Test
+ public void testContinuousScheduling() throws Exception {
+ // set continuous scheduling enabled
+ FairScheduler fs = new FairScheduler();
+ Configuration conf = createConfiguration();
+ conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
+ true);
+ fs.reinitialize(conf, resourceManager.getRMContext());
+ Assert.assertTrue("Continuous scheduling should be enabled.",
+ fs.isContinuousSchedulingEnabled());
+
+ // Add one node
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ fs.handle(nodeEvent1);
+
+ // available resource
+ Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
+ Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
+
+ // send application request
+ ApplicationAttemptId appAttemptId =
+ createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+ fs.addApplication(appAttemptId, "queue11", "user11");
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest request =
+ createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true);
+ ask.add(request);
+ fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
+
+ // waiting for continuous_scheduler_sleep_time
+ // at least one pass
+ Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
+
+ // check consumption
+ Resource consumption =
+ fs.applications.get(appAttemptId).getCurrentConsumption();
+ Assert.assertEquals(1024, consumption.getMemory());
+ Assert.assertEquals(1, consumption.getVirtualCores());
+ }
+
}