Author: vinodkv
Date: Tue Mar 18 02:53:20 2014
New Revision: 1578722
URL: http://svn.apache.org/r1578722
Log:
YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling from
node-heartbeats. Contributed by Arun C Murthy.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1578722&r1=1578721&r2=1578722&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Mar 18 02:53:20 2014
@@ -312,6 +312,9 @@ Release 2.4.0 - UNRELEASED
YARN-1824. Improved NodeManager and clients to be able to handle cross
platform application submissions. (Jian He via vinodkv)
+ YARN-1512. Enhanced CapacityScheduler to be able to decouple scheduling
from
+ node-heartbeats. (Arun C Murthy via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml?rev=1578722&r1=1578721&r2=1578722&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
Tue Mar 18 02:53:20 2014
@@ -172,6 +172,12 @@
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <!-- Inconsistent sync warning - scheduleAsynchronously is only initialized
once and never changed -->
+ <Match>
+ <Class
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"
/>
+ <Field name="scheduleAsynchronously" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
<!-- Inconsistent sync warning - minimumAllocation is only initialized once
and never changed -->
<Match>
<Class
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler"
/>
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1578722&r1=1578721&r2=1578722&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Tue Mar 18 02:53:20 2014
@@ -21,11 +21,14 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -194,6 +197,18 @@ public class CapacityScheduler extends A
private ResourceCalculator calculator;
private boolean usePortForNodeName;
+ private boolean scheduleAsynchronously;
+ private AsyncScheduleThread asyncSchedulerThread;
+
+ /**
+ * EXPERT
+ */
+ private long asyncScheduleInterval;
+ private static final String ASYNC_SCHEDULER_INTERVAL =
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms";
+ private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
+
public CapacityScheduler() {}
@Override
@@ -272,11 +287,23 @@ public class CapacityScheduler extends A
initializeQueues(this.conf);
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
+ asyncScheduleInterval =
+ this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ if (scheduleAsynchronously) {
+ asyncSchedulerThread = new AsyncScheduleThread(this);
+ asyncSchedulerThread.start();
+ }
+
initialized = true;
LOG.info("Initialized CapacityScheduler with " +
"calculator=" + getResourceCalculator().getClass() + ", " +
"minimumAllocation=<" + getMinimumResourceCapability() + ">, " +
- "maximumAllocation=<" + getMaximumResourceCapability() + ">");
+ "maximumAllocation=<" + getMaximumResourceCapability() + ">, " +
+ "asynchronousScheduling=" + scheduleAsynchronously + ", " +
+ "asyncScheduleInterval=" + asyncScheduleInterval + "ms");
+
} else {
CapacitySchedulerConfiguration oldConf = this.conf;
this.conf = loadCapacitySchedulerConfiguration(configuration);
@@ -290,7 +317,69 @@ public class CapacityScheduler extends A
}
}
}
+
+ long getAsyncScheduleInterval() {
+ return asyncScheduleInterval;
+ }
+
+ private final static Random random = new Random(System.currentTimeMillis());
+
+ /**
+ * Schedule on all nodes by starting at a random point.
+ * @param cs
+ */
+ static void schedule(CapacityScheduler cs) {
+ // First randomize the start point
+ int current = 0;
+ Collection<FiCaSchedulerNode> nodes = cs.getAllNodes().values();
+ int start = random.nextInt(nodes.size());
+ for (FiCaSchedulerNode node : nodes) {
+ if (current++ >= start) {
+ cs.allocateContainersToNode(node);
+ }
+ }
+ // Now, just get everyone to be safe
+ for (FiCaSchedulerNode node : nodes) {
+ cs.allocateContainersToNode(node);
+ }
+ try {
+ Thread.sleep(cs.getAsyncScheduleInterval());
+ } catch (InterruptedException e) {}
+ }
+
+ static class AsyncScheduleThread extends Thread {
+
+ private final CapacityScheduler cs;
+ private AtomicBoolean runSchedules = new AtomicBoolean(false);
+
+ public AsyncScheduleThread(CapacityScheduler cs) {
+ this.cs = cs;
+ setDaemon(true);
+ }
+ @Override
+ public void run() {
+ while (true) {
+ if (!runSchedules.get()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {}
+ } else {
+ schedule(cs);
+ }
+ }
+ }
+
+ public void beginSchedule() {
+ runSchedules.set(true);
+ }
+
+ public void suspendSchedule() {
+ runSchedules.set(false);
+ }
+
+ }
+
@Private
public static final String ROOT_QUEUE =
CapacitySchedulerConfiguration.PREFIX +
CapacitySchedulerConfiguration.ROOT;
@@ -696,6 +785,9 @@ public class CapacityScheduler extends A
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
+ }
+
+ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
// Assign new containers...
// 1. Check for reserved applications
@@ -708,7 +800,8 @@ public class CapacityScheduler extends A
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
- reservedApplication.getApplicationId() + " on node: " + nm);
+ reservedApplication.getApplicationId() + " on node: " +
+ node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node);
@@ -729,9 +822,16 @@ public class CapacityScheduler extends A
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
- root.assignContainers(clusterResource, node);
+ if (Resources.greaterThanOrEqual(calculator, getClusterResources(),
+ node.getAvailableResource(), minimumAllocation)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to schedule on node: " + node.getNodeName() +
+ ", available: " + node.getAvailableResource());
+ }
+ root.assignContainers(clusterResource, node);
+ }
} else {
- LOG.info("Skipping scheduling since node " + nm +
+ LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
@@ -772,7 +872,11 @@ public class CapacityScheduler extends A
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
- nodeUpdate(nodeUpdatedEvent.getRMNode());
+ RMNode node = nodeUpdatedEvent.getRMNode();
+ nodeUpdate(node);
+ if (!scheduleAsynchronously) {
+ allocateContainersToNode(getNode(node.getNodeID()));
+ }
}
break;
case APP_ADDED:
@@ -831,6 +935,10 @@ public class CapacityScheduler extends A
++numNodeManagers;
LOG.info("Added node " + nodeManager.getNodeAddress() +
" clusterResource: " + clusterResource);
+
+ if (scheduleAsynchronously && numNodeManagers == 1) {
+ asyncSchedulerThread.beginSchedule();
+ }
}
private synchronized void removeNode(RMNode nodeInfo) {
@@ -842,6 +950,10 @@ public class CapacityScheduler extends A
root.updateClusterResource(clusterResource);
--numNodeManagers;
+ if (scheduleAsynchronously && numNodeManagers == 0) {
+ asyncSchedulerThread.suspendSchedule();
+ }
+
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
@@ -931,7 +1043,12 @@ public class CapacityScheduler extends A
FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
+
+ @Lock(Lock.NoLock.class)
+ Map<NodeId, FiCaSchedulerNode> getAllNodes() {
+ return nodes;
+ }
+
@Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1578722&r1=1578721&r2=1578722&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Tue Mar 18 02:53:20 2014
@@ -135,6 +135,17 @@ public class CapacitySchedulerConfigurat
@Private
public static final int DEFAULT_NODE_LOCALITY_DELAY = -1;
+ @Private
+ public static final String SCHEDULE_ASYNCHRONOUSLY_PREFIX =
+ PREFIX + "schedule-asynchronously";
+
+ @Private
+ public static final String SCHEDULE_ASYNCHRONOUSLY_ENABLE =
+ SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
+
+ @Private
+ public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
+
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@@ -357,4 +368,14 @@ public class CapacitySchedulerConfigurat
resourceCalculatorClass,
ResourceCalculator.class);
}
+
+ public boolean getScheduleAynschronously() {
+ return getBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE,
+ DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
+ }
+
+ public void setScheduleAynschronously(boolean async) {
+ setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
+ }
+
}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1578722&r1=1578721&r2=1578722&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
Tue Mar 18 02:53:20 2014
@@ -19,20 +19,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
@@ -61,11 +56,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -650,4 +642,30 @@ public class TestCapacityScheduler {
cs.getSchedulerApplications(), cs, "a1");
Assert.assertEquals("a1", app.getQueue().getQueueName());
}
- }
+
+ @Test
+ public void testAsyncScheduling() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm = new MockRM(conf);
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ final int NODES = 100;
+
+ // Register nodes
+ for (int i=0; i < NODES; ++i) {
+ String host = "192.168.1." + i;
+ RMNode node =
+ MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+ cs.handle(new NodeAddedSchedulerEvent(node));
+ }
+
+ // Now directly exercise the scheduling loop
+ for (int i=0; i < NODES; ++i) {
+ CapacityScheduler.schedule(cs);
+ }
+ }
+
+}