Author: sandy
Date: Thu Oct 31 22:07:09 2013
New Revision: 1537735
URL: http://svn.apache.org/r1537735
Log:
YARN-1290. Let continuous scheduling achieve more balanced task assignment (Wei
Yan via Sandy Ryza)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1537735&r1=1537734&r2=1537735&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Oct 31
22:07:09 2013
@@ -59,6 +59,9 @@ Release 2.3.0 - UNRELEASED
applications so that clients can get information about them post
RM-restart.
(Jian He via vinodkv)
+ YARN-1290. Let continuous scheduling achieve more balanced task assignment
+ (Wei Yan via Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1537735&r1=1537734&r2=1537735&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu Oct 31 22:07:09 2013
@@ -181,6 +181,8 @@ public class FairScheduler implements Re
protected WeightAdjuster weightAdjuster; // Can be null for no weight
adjuster
protected boolean continuousSchedulingEnabled; // Continuous Scheduling
enabled or not
protected int continuousSchedulingSleepMs; // Sleep time for each pass in
continuous scheduling
+ private Comparator nodeAvailableResourceComparator =
+ new NodeAvailableResourceComparator(); // Node available resource
comparator
protected double nodeLocalityThreshold; // Cluster threshold for node
locality
protected double rackLocalityThreshold; // Cluster threshold for rack
locality
protected long nodeLocalityDelayMs; // Delay for node locality
@@ -948,14 +950,22 @@ public class FairScheduler implements Re
private void continuousScheduling() {
while (true) {
- for (FSSchedulerNode node : nodes.values()) {
- try {
- if (Resources.fitsIn(minimumAllocation,
node.getAvailableResource())) {
- attemptScheduling(node);
+ List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+ Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+
+ // iterate all nodes
+ for (NodeId nodeId : nodeIdList) {
+ if (nodes.containsKey(nodeId)) {
+ FSSchedulerNode node = nodes.get(nodeId);
+ try {
+ if (Resources.fitsIn(minimumAllocation,
+ node.getAvailableResource())) {
+ attemptScheduling(node);
+ }
+ } catch (Throwable ex) {
+ LOG.warn("Error while attempting scheduling for node " + node +
+ ": " + ex.toString(), ex);
}
- } catch (Throwable ex) {
- LOG.warn("Error while attempting scheduling for node " + node + ": "
+
- ex.toString(), ex);
}
}
try {
@@ -966,6 +976,17 @@ public class FairScheduler implements Re
}
}
}
+
+ /** Sort nodes by available resource */
+ private class NodeAvailableResourceComparator implements Comparator<NodeId> {
+
+ @Override
+ public int compare(NodeId n1, NodeId n2) {
+ return RESOURCE_CALCULATOR.compare(clusterCapacity,
+ nodes.get(n2).getAvailableResource(),
+ nodes.get(n1).getAvailableResource());
+ }
+ }
private synchronized void attemptScheduling(FSSchedulerNode node) {
// Assign new containers...
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1537735&r1=1537734&r2=1537735&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu Oct 31 22:07:09 2013
@@ -33,6 +33,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -53,6 +56,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -2348,7 +2352,7 @@ public class TestFairScheduler {
fs.applications, FSSchedulerApp.class);
}
- @Test (timeout = 5000)
+ @Test (timeout = 10000)
public void testContinuousScheduling() throws Exception {
// set continuous scheduling enabled
FairScheduler fs = new FairScheduler();
@@ -2359,16 +2363,21 @@ public class TestFairScheduler {
Assert.assertTrue("Continuous scheduling should be enabled.",
fs.isContinuousSchedulingEnabled());
- // Add one node
+ // Add two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
fs.handle(nodeEvent1);
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ fs.handle(nodeEvent2);
// available resource
- Assert.assertEquals(fs.getClusterCapacity().getMemory(), 8 * 1024);
- Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 8);
+ Assert.assertEquals(fs.getClusterCapacity().getMemory(), 16 * 1024);
+ Assert.assertEquals(fs.getClusterCapacity().getVirtualCores(), 16);
// send application request
ApplicationAttemptId appAttemptId =
@@ -2387,10 +2396,32 @@ public class TestFairScheduler {
FSSchedulerApp app = fs.applications.get(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
-
+
// check consumption
Assert.assertEquals(1024, app.getCurrentConsumption().getMemory());
Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores());
+
+ // another request
+ request =
+ createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true);
+ ask.clear();
+ ask.add(request);
+ fs.allocate(appAttemptId, ask, new ArrayList<ContainerId>(), null, null);
+
+ // Wait until app gets resources
+ while (app.getCurrentConsumption()
+ .equals(Resources.createResource(1024, 1))) { }
+
+ Assert.assertEquals(2048, app.getCurrentConsumption().getMemory());
+ Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores());
+
+ // 2 containers should be assigned to 2 nodes
+ Set<NodeId> nodes = new HashSet<NodeId>();
+ Iterator<RMContainer> it = app.getLiveContainers().iterator();
+ while (it.hasNext()) {
+ nodes.add(it.next().getContainer().getNodeId());
+ }
+ Assert.assertEquals(2, nodes.size());
}