Author: tucu
Date: Thu Apr 18 18:13:36 2013
New Revision: 1469511
URL: http://svn.apache.org/r1469511
Log:
YARN-482. FS: Extend SchedulingMode to intermediate queues. (kkambatl via tucu)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
- copied unchanged from r1469506,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
- copied from r1469506,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
- copied unchanged from r1469506,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
- copied unchanged from r1469506,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
- copied unchanged from r1469506,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
Removed:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Apr 18
18:13:36 2013
@@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED
NEW FEATURES
+ YARN-482. FS: Extend SchedulingMode to intermediate queues.
+ (kkambatl via tucu)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
Thu Apr 18 18:13:36 2013
@@ -278,9 +278,7 @@ public class AppSchedulable extends Sche
}
}
-
- @Override
- public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
if (reserved) {
@@ -345,4 +343,13 @@ public class AppSchedulable extends Sche
}
return Resources.none();
}
+
+ public Resource assignReservedContainer(FSSchedulerNode node) {
+ return assignContainer(node, true);
+ }
+
+ @Override
+ public Resource assignContainer(FSSchedulerNode node) {
+ return assignContainer(node, false);
+ }
}
\ No newline at end of file
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
Thu Apr 18 18:13:36 2013
@@ -40,9 +40,6 @@ public class FSLeafQueue extends FSQueue
private final List<AppSchedulable> appScheds =
new ArrayList<AppSchedulable>();
-
- /** Scheduling mode for jobs inside the queue (fair or FIFO) */
- private SchedulingMode schedulingMode;
private final FairScheduler scheduler;
private final QueueManager queueMgr;
@@ -86,13 +83,18 @@ public class FSLeafQueue extends FSQueue
return appScheds;
}
- public void setSchedulingMode(SchedulingMode mode) {
- this.schedulingMode = mode;
+ @Override
+ public void setPolicy(SchedulingPolicy policy)
+ throws AllocationConfigurationException {
+ if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF))
{
+ throwPolicyDoesnotApplyException(policy);
+ }
+ super.policy = policy;
}
@Override
- public void recomputeFairShares() {
- schedulingMode.computeShares(getAppSchedulables(), getFairShare());
+ public void recomputeShares() {
+ policy.computeShares(getAppSchedulables(), getFairShare());
}
@Override
@@ -136,42 +138,27 @@ public class FSLeafQueue extends FSQueue
}
@Override
- public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
- LOG.debug("Node offered to queue: " + getName() + " reserved: " +
reserved);
- // If this queue is over its limit, reject
- if (Resources.greaterThan(getResourceUsage(),
- queueMgr.getMaxResources(getName()))) {
- return Resources.none();
+ public Resource assignContainer(FSSchedulerNode node) {
+ Resource assigned = Resources.none();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node offered to queue: " + getName());
}
- // If this node already has reserved resources for an app, first try to
- // finish allocating resources for that app.
- if (reserved) {
- for (AppSchedulable sched : appScheds) {
- if (sched.getApp().getApplicationAttemptId() ==
- node.getReservedContainer().getApplicationAttemptId()) {
- return sched.assignContainer(node, reserved);
- }
- }
- return Resources.none(); // We should never get here
+ if (!assignContainerPreCheck(node)) {
+ return assigned;
}
- // Otherwise, chose app to schedule based on given policy.
- else {
- Comparator<Schedulable> comparator = schedulingMode.getComparator();
-
- Collections.sort(appScheds, comparator);
- for (AppSchedulable sched: appScheds) {
- if (sched.getRunnable()) {
- Resource assignedResource = sched.assignContainer(node, reserved);
- if (!assignedResource.equals(Resources.none())) {
- return assignedResource;
- }
+ Comparator<Schedulable> comparator = policy.getComparator();
+ Collections.sort(appScheds, comparator);
+ for (AppSchedulable sched : appScheds) {
+ if (sched.getRunnable()) {
+ assigned = sched.assignContainer(node);
+ if (Resources.greaterThan(assigned, Resources.none())) {
+ break;
}
}
-
- return Resources.none();
}
+ return assigned;
}
@Override
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
Thu Apr 18 18:13:36 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -33,7 +34,6 @@ public class FSParentQueue extends FSQue
private static final Log LOG = LogFactory.getLog(
FSParentQueue.class.getName());
-
private final List<FSQueue> childQueues =
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
@@ -50,11 +50,11 @@ public class FSParentQueue extends FSQue
}
@Override
- public void recomputeFairShares() {
- SchedulingMode.getDefault().computeShares(childQueues, getFairShare());
+ public void recomputeShares() {
+ policy.computeShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
- childQueue.recomputeFairShares();
+ childQueue.recomputeShares();
}
}
@@ -131,13 +131,41 @@ public class FSParentQueue extends FSQue
}
@Override
- public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
- throw new IllegalStateException(
- "Parent queue should not be assigned container");
+ public Resource assignContainer(FSSchedulerNode node) {
+ Resource assigned = Resources.none();
+
+ // If this queue is over its limit, reject
+ if (Resources.greaterThan(getResourceUsage(),
+ queueMgr.getMaxResources(getName()))) {
+ return assigned;
+ }
+
+ Collections.sort(childQueues, policy.getComparator());
+ for (FSQueue child : childQueues) {
+ assigned = child.assignContainer(node);
+ if (node.getReservedContainer() != null
+ || Resources.greaterThan(assigned, Resources.none())) {
+ break;
+ }
+ }
+ return assigned;
}
@Override
public Collection<FSQueue> getChildQueues() {
return childQueues;
}
+
+ @Override
+ public void setPolicy(SchedulingPolicy policy)
+ throws AllocationConfigurationException {
+ boolean allowed =
+ SchedulingPolicy.isApplicableTo(policy, (this == queueMgr
+ .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT
+ : SchedulingPolicy.DEPTH_INTERMEDIATE);
+ if (!allowed) {
+ throwPolicyDoesnotApplyException(policy);
+ }
+ super.policy = policy;
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
Thu Apr 18 18:13:36 2013
@@ -45,6 +45,8 @@ public abstract class FSQueue extends Sc
protected final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
+ protected SchedulingPolicy policy = SchedulingPolicy.getDefault();
+
public FSQueue(String name, QueueManager queueMgr,
FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
@@ -63,6 +65,19 @@ public abstract class FSQueue extends Sc
return name;
}
+ public SchedulingPolicy getPolicy() {
+ return policy;
+ }
+
+ protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
+ throws AllocationConfigurationException {
+ throw new AllocationConfigurationException("SchedulingPolicy " + policy
+ + " does not apply to queue " + getName());
+ }
+
+ public abstract void setPolicy(SchedulingPolicy policy)
+ throws AllocationConfigurationException;
+
@Override
public double getWeight() {
return queueMgr.getQueueWeight(getName());
@@ -130,13 +145,27 @@ public abstract class FSQueue extends Sc
}
/**
- * Recomputes the fair shares for all queues and applications
- * under this queue.
+ * Recomputes the shares for all child queues and applications based on this
+ * queue's current share
*/
- public abstract void recomputeFairShares();
+ public abstract void recomputeShares();
/**
* Gets the children of this queue, if any.
*/
public abstract Collection<FSQueue> getChildQueues();
+
+ /**
+ * Helper method to check if the queue should attempt assigning resources
+ *
+ * @return true if check passes (can assign) or false otherwise
+ */
+ protected boolean assignContainerPreCheck(FSSchedulerNode node) {
+ if (Resources.greaterThan(getResourceUsage(),
+ queueMgr.getMaxResources(getName()))
+ || node.getReservedContainer() != null) {
+ return false;
+ }
+ return true;
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
Thu Apr 18 18:13:36 2013
@@ -52,6 +52,7 @@ public class FSSchedulerNode extends Sch
private volatile int numContainers;
private RMContainer reservedContainer;
+ private AppSchedulable reservedAppSchedulable;
/* set of containers that are allocated containers */
private final Map<ContainerId, RMContainer> launchedContainers =
@@ -221,6 +222,7 @@ public class FSSchedulerNode extends Sch
" on node " + this + " for application " + application);
}
this.reservedContainer = reservedContainer;
+ this.reservedAppSchedulable = application.getAppSchedulable();
}
public synchronized void unreserveResource(
@@ -237,11 +239,15 @@ public class FSSchedulerNode extends Sch
" on node " + this);
}
- reservedContainer = null;
+ this.reservedContainer = null;
+ this.reservedAppSchedulable = null;
}
public synchronized RMContainer getReservedContainer() {
return reservedContainer;
}
+ public synchronized AppSchedulable getReservedAppSchedulable() {
+ return reservedAppSchedulable;
+ }
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Thu Apr 18 18:13:36 2013
@@ -161,7 +161,7 @@ public class FairScheduler implements Re
protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
-
+
public FairScheduler() {
clock = new SystemClock();
queueMgr = new QueueManager(this);
@@ -217,7 +217,7 @@ public class FairScheduler implements Re
rootQueue.setFairShare(clusterCapacity);
// Recursively compute fair shares for all queues
// and update metrics
- rootQueue.recomputeFairShares();
+ rootQueue.recomputeShares();
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
@@ -786,39 +786,24 @@ public class FairScheduler implements Re
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
- // If we have have an application that has reserved a resource on this node
- // already, we try to complete the reservation.
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- FSSchedulerApp reservedApplication =
- applications.get(reservedContainer.getApplicationAttemptId());
+ AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
+ if (reservedAppSchedulable != null) {
+ // Reservation exists; try to fulfill the reservation
+ LOG.info("Trying to fulfill reservation for application "
+ + reservedAppSchedulable.getApp().getApplicationAttemptId()
+ + " on node: " + nm);
- // Try to fulfill the reservation
- LOG.info("Trying to fulfill reservation for application " +
- reservedApplication.getApplicationId() + " on node: " + nm);
-
- FSLeafQueue queue =
queueMgr.getLeafQueue(reservedApplication.getQueueName());
- queue.assignContainer(node, true);
+ node.getReservedAppSchedulable().assignReservedContainer(node);
}
-
- // Otherwise, schedule at queue which is furthest below fair share
else {
+ // No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
while (node.getReservedContainer() == null) {
- // At most one task is scheduled each iteration of this loop
- List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
- queueMgr.getLeafQueues());
- Collections.sort(scheds, SchedulingMode.getDefault().getComparator());
boolean assignedContainer = false;
- for (FSLeafQueue sched : scheds) {
- Resource assigned = sched.assignContainer(node, false);
- if (Resources.greaterThan(assigned, Resources.none()) ||
- node.getReservedContainer() != null) {
- eventLog.log("ASSIGN", nm.getHostName(), assigned);
- assignedContainers++;
- assignedContainer = true;
- break;
- }
+ if (Resources.greaterThan(
+ queueMgr.getRootQueue().assignContainer(node),
+ Resources.none())) {
+ assignedContainer = true;
}
if (!assignedContainer) { break; }
if (!assignMultiple) { break; }
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
Thu Apr 18 18:13:36 2013
@@ -143,7 +143,6 @@ public class QueueManager {
if (leafQueue == null) {
return null;
}
- leafQueue.setSchedulingMode(info.defaultSchedulingMode);
queue = leafQueue;
} else if (queue instanceof FSParentQueue) {
return null;
@@ -302,7 +301,7 @@ public class QueueManager {
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Double> queueWeights = new HashMap<String, Double>();
- Map<String, SchedulingMode> queueModes = new HashMap<String,
SchedulingMode>();
+ Map<String, SchedulingPolicy> queuePolicies = new HashMap<String,
SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
@@ -310,7 +309,7 @@ public class QueueManager {
int queueMaxAppsDefault = Integer.MAX_VALUE;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
- SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
+ SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
// Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -339,7 +338,7 @@ public class QueueManager {
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
loadQueue("root", element, minQueueResources, maxQueueResources,
queueMaxApps,
- userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
@@ -370,11 +369,12 @@ public class QueueManager {
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
- queueMaxAppsDefault = val;}
- else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
+ queueMaxAppsDefault = val;
+ } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+ || "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
- SchedulingMode.setDefault(text);
- defaultSchedulingMode = SchedulingMode.getDefault();
+ SchedulingPolicy.setDefault(text);
+ defaultSchedPolicy = SchedulingPolicy.getDefault();
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -385,7 +385,7 @@ public class QueueManager {
synchronized (this) {
info = new QueueManagerInfo(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
- queueMaxAppsDefault, defaultSchedulingMode,
minSharePreemptionTimeouts,
+ queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout,
defaultMinSharePreemptionTimeout);
// Root queue should have empty ACLs. As a queue's ACL is the union of
@@ -396,14 +396,15 @@ public class QueueManager {
rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
queueAcls.put(ROOT_QUEUE, rootAcls);
-
+
+ // Create all queus
for (String name: queueNamesInAllocFile) {
- FSLeafQueue queue = getLeafQueue(name);
- if (queueModes.containsKey(name)) {
- queue.setSchedulingMode(queueModes.get(name));
- } else {
- queue.setSchedulingMode(defaultSchedulingMode);
- }
+ getLeafQueue(name);
+ }
+
+ // Set custom policies as specified
+ for (Map.Entry<String, SchedulingPolicy> entry :
queuePolicies.entrySet()) {
+ queues.get(entry.getKey()).setPolicy(entry.getValue());
}
}
}
@@ -414,7 +415,8 @@ public class QueueManager {
private void loadQueue(String parentName, Element element, Map<String,
Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer>
queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
- Map<String, SchedulingMode> queueModes, Map<String, Long>
minSharePreemptionTimeouts,
+ Map<String, SchedulingPolicy> queuePolicies,
+ Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String>
queueNamesInAllocFile)
throws AllocationConfigurationException {
String queueName = parentName + "." + element.getAttribute("name");
@@ -448,9 +450,10 @@ public class QueueManager {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
- } else if ("schedulingMode".equals(field.getTagName())) {
+ } else if ("schedulingPolicy".equals(field.getTagName())
+ || "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
- queueModes.put(queueName, SchedulingMode.parse(text));
+ queuePolicies.put(queueName, SchedulingPolicy.parse(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
@@ -459,8 +462,9 @@ public class QueueManager {
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
- loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps,
- userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ loadQueue(queueName, field, minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, queuePolicies,
+ minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
isLeaf = false;
}
@@ -615,13 +619,13 @@ public class QueueManager {
// below half its fair share for this long, it is allowed to preempt tasks.
public final long fairSharePreemptionTimeout;
- public final SchedulingMode defaultSchedulingMode;
+ public final SchedulingPolicy defaultSchedulingPolicy;
public QueueManagerInfo(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, Double> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
+ int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long
defaultMinSharePreemptionTimeout) {
@@ -632,7 +636,7 @@ public class QueueManager {
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
- this.defaultSchedulingMode = defaultSchedulingMode;
+ this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.queueAcls = queueAcls;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
@@ -651,7 +655,7 @@ public class QueueManager {
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
- defaultSchedulingMode = SchedulingMode.getDefault();
+ defaultSchedulingPolicy = SchedulingPolicy.getDefault();
}
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
Thu Apr 18 18:13:36 2013
@@ -93,11 +93,9 @@ public abstract class Schedulable {
/**
* Assign a container on this node if possible, and return the amount of
- * resources assigned. If {@code reserved} is true, it means a reservation
- * already exists on this node, and the schedulable should fulfill that
- * reservation if possible.
+ * resources assigned.
*/
- public abstract Resource assignContainer(FSSchedulerNode node, boolean
reserved);
+ public abstract Resource assignContainer(FSSchedulerNode node);
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
Thu Apr 18 18:13:36 2013
@@ -68,7 +68,7 @@ public class FakeSchedulable extends Sch
}
@Override
- public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+ public Resource assignContainer(FSSchedulerNode node) {
return null;
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java
Thu Apr 18 18:13:36 2013
@@ -24,7 +24,7 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.junit.Before;
import org.junit.Test;
@@ -33,12 +33,12 @@ import org.junit.Test;
*/
public class TestComputeFairShares {
private List<Schedulable> scheds;
- private SchedulingMode schedulingMode;
+ private SchedulingPolicy schedulingMode;
@Before
public void setUp() throws Exception {
scheds = new ArrayList<Schedulable>();
- schedulingMode = new FairSchedulingMode();
+ schedulingMode = new FairSharePolicy();
}
/**
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1469511&r1=1469510&r2=1469511&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Thu Apr 18 18:13:36 2013
@@ -73,7 +73,7 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@@ -284,7 +284,7 @@ public class TestFairScheduler {
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
}
-
+
@Test
public void testHierarchicalQueuesSimilarParents() {
QueueManager queueManager = scheduler.getQueueManager();
@@ -1359,7 +1359,7 @@ public class TestFairScheduler {
FSSchedulerApp app2 = scheduler.applications.get(attId2);
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1");
- queue1.setSchedulingMode(new FifoSchedulingMode());
+ queue1.setPolicy(new FifoPolicy());
scheduler.update();
@@ -1381,7 +1381,80 @@ public class TestFairScheduler {
assertEquals(2, app1.getLiveContainers().size());
assertEquals(1, app2.getLiveContainers().size());
}
-
+
+ /**
+ * Test to verify the behavior of
+ * {@link FSQueue#assignContainer(FSSchedulerNode)})
+ *
+ * Create two queues under root (fifoQueue and fairParent), and two queues
+ * under fairParent (fairChild1 and fairChild2). Submit two apps to the
+ * fifoQueue and one each to the fairChild* queues, all apps requiring 4
+ * containers each of the total 16 container capacity
+ *
+ * Assert the number of containers for each app after 4, 8, 12 and 16
updates.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 5000)
+ public void testAssignContainer() throws Exception {
+ final String user = "user1";
+ final String fifoQueue = "fifo";
+ final String fairParent = "fairParent";
+ final String fairChild1 = fairParent + ".fairChild1";
+ final String fairChild2 = fairParent + ".fairChild2";
+
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
+ RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192));
+
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+
+ scheduler.handle(nodeEvent1);
+ scheduler.handle(nodeEvent2);
+
+ ApplicationAttemptId attId1 =
+ createSchedulingRequest(1024, fifoQueue, user, 4);
+ ApplicationAttemptId attId2 =
+ createSchedulingRequest(1024, fairChild1, user, 4);
+ ApplicationAttemptId attId3 =
+ createSchedulingRequest(1024, fairChild2, user, 4);
+ ApplicationAttemptId attId4 =
+ createSchedulingRequest(1024, fifoQueue, user, 4);
+
+ FSSchedulerApp app1 = scheduler.applications.get(attId1);
+ FSSchedulerApp app2 = scheduler.applications.get(attId2);
+ FSSchedulerApp app3 = scheduler.applications.get(attId3);
+ FSSchedulerApp app4 = scheduler.applications.get(attId4);
+
+ scheduler.getQueueManager().getLeafQueue(fifoQueue)
+ .setPolicy(SchedulingPolicy.parse("fifo"));
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent updateEvent1 = new
NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent updateEvent2 = new
NodeUpdateSchedulerEvent(node2);
+
+ for (int i = 0; i < 8; i++) {
+ scheduler.handle(updateEvent1);
+ scheduler.handle(updateEvent2);
+ if ((i + 1) % 2 == 0) {
+ // 4 node updates: fifoQueue should have received 2, and fairChild*
+ // should have received one each
+ String ERR =
+ "Wrong number of assigned containers after " + (i + 1) + "
updates";
+ if (i < 4) {
+ // app1 req still not met
+ assertEquals(ERR, (i + 1), app1.getLiveContainers().size());
+ assertEquals(ERR, 0, app4.getLiveContainers().size());
+ } else {
+ // app1 req has been met, app4 should be served now
+ assertEquals(ERR, 4, app1.getLiveContainers().size());
+ assertEquals(ERR, (i - 3), app4.getLiveContainers().size());
+ }
+ assertEquals(ERR, (i + 1) / 2, app2.getLiveContainers().size());
+ assertEquals(ERR, (i + 1) / 2, app3.getLiveContainers().size());
+ }
+ }
+ }
@SuppressWarnings("unchecked")
@Test