Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Aug 19 23:49:39 2014 @@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.re import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -39,7 +36,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -53,9 +49,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -76,10 +69,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -99,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -122,11 +114,10 @@ import com.google.common.annotations.Vis @LimitedPrivate("yarn") @Unstable @SuppressWarnings("unchecked") -public class FairScheduler extends AbstractYarnScheduler { - private boolean initialized; +public class FairScheduler extends + AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> { private FairSchedulerConfiguration conf; - private Resource minimumAllocation; - private Resource maximumAllocation; + private Resource incrAllocation; private QueueManager queueMgr; private Clock clock; @@ -142,25 +133,32 @@ public class FairScheduler extends Abstr public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); // How often fair shares are re-calculated (ms) - protected long UPDATE_INTERVAL = 500; + protected long updateInterval; + private final int UPDATE_DEBUG_FREQUENCY = 5; + private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; + + @VisibleForTesting + Thread updateThread; + + @VisibleForTesting + Thread schedulingThread; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; // Aggregate metrics FSQueueMetrics rootMetrics; + FSOpDurations fsOpDurations; // Time when we last updated preemption vars protected long lastPreemptionUpdateTime; // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // Nodes in the cluster, indexed by NodeId - private Map<NodeId, FSSchedulerNode> nodes = - new ConcurrentHashMap<NodeId, FSSchedulerNode>(); - - // Aggregate capacity of the cluster - private Resource clusterCapacity = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); + // Preemption related variables + protected boolean preemptionEnabled; + protected float preemptionUtilizationThreshold; - // How often tasks are preempted + // How often tasks are preempted protected long preemptionInterval; // ms to wait before force killing stuff (must be longer than a couple @@ -170,7 +168,6 @@ public class FairScheduler extends Abstr // Containers whose AMs have been warned that they will be preempted soon. private List<RMContainer> warnedContainers = new ArrayList<RMContainer>(); - protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not @@ -194,6 +191,7 @@ public class FairScheduler extends Abstr AllocationConfiguration allocConf; public FairScheduler() { + super(FairScheduler.class.getName()); clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); @@ -246,34 +244,25 @@ public class FairScheduler extends Abstr return queueMgr; } - @Override - public RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp attempt = getCurrentAttemptForContainer(containerId); - return (attempt == null) ? null : attempt.getRMContainer(containerId); - } - - private FSSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - /** - * A runnable which calls {@link FairScheduler#update()} every - * <code>UPDATE_INTERVAL</code> milliseconds. + * Thread which calls {@link FairScheduler#update()} every + * <code>updateInterval</code> milliseconds. */ - private class UpdateThread implements Runnable { + private class UpdateThread extends Thread { + + @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { - Thread.sleep(UPDATE_INTERVAL); + Thread.sleep(updateInterval); + long start = getClock().getTime(); update(); preemptTasksIfNecessary(); + long duration = getClock().getTime() - start; + fsOpDurations.addUpdateThreadRunDuration(duration); + } catch (InterruptedException ie) { + LOG.warn("Update thread interrupted. Exiting."); + return; } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } @@ -282,11 +271,32 @@ public class FairScheduler extends Abstr } /** + * Thread which attempts scheduling resources continuously, + * asynchronous to the node heartbeats. + */ + private class ContinuousSchedulingThread extends Thread { + + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.warn("Continuous scheduling thread interrupted. Exiting.", e); + return; + } + } + } + } + + /** * Recompute the internal variables used by the scheduler - per-job weights, * fair shares, deficits, minimum slot allocations, and amount of used and * required resources per job. */ protected synchronized void update() { + long start = getClock().getTime(); updatePreemptionVariables(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -294,10 +304,25 @@ public class FairScheduler extends Abstr // Recursively update demands for all queues rootQueue.updateDemand(); - rootQueue.setFairShare(clusterCapacity); + rootQueue.setFairShare(clusterResource); // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); + + if (LOG.isDebugEnabled()) { + if (--updatesToSkipForDebug < 0) { + updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; + LOG.debug("Cluster Capacity: " + clusterResource + + " Allocations: " + rootMetrics.getAllocatedResources() + + " Availability: " + Resource.newInstance( + rootMetrics.getAvailableMB(), + rootMetrics.getAvailableVirtualCores()) + + " Demand: " + rootQueue.getDemand()); + } + } + + long duration = getClock().getTime() - start; + fsOpDurations.addUpdateCallDuration(duration); } /** @@ -306,7 +331,7 @@ public class FairScheduler extends Abstr * for each type of task. */ private void updatePreemptionVariables() { - long now = clock.getTime(); + long now = getClock().getTime(); lastPreemptionUpdateTime = now; for (FSLeafQueue sched : queueMgr.getLeafQueues()) { if (!isStarvedForMinShare(sched)) { @@ -322,9 +347,9 @@ public class FairScheduler extends Abstr * Is a queue below its min share for the given task type? */ boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredShare); } @@ -333,9 +358,10 @@ public class FairScheduler extends Abstr * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, + clusterResource, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, + return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, sched.getResourceUsage(), desiredFairShare); } @@ -347,111 +373,98 @@ public class FairScheduler extends Abstr * and then select the right ones using preemptTasks. */ protected synchronized void preemptTasksIfNecessary() { - if (!preemptionEnabled) { + if (!shouldAttemptPreemption()) { return; } - long curTime = clock.getTime(); + long curTime = getClock().getTime(); if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } lastPreemptCheckTime = curTime; - Resource resToPreempt = Resources.none(); - + Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); + Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, resToPreempt, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { - preemptResources(queueMgr.getLeafQueues(), resToPreempt); + preemptResources(resToPreempt); } } /** - * Preempt a quantity of resources from a list of QueueSchedulables. The - * policy for this is to pick apps from queues that are over their fair share, - * but make sure that no queue is placed below its fair share in the process. - * We further prioritize preemption by choosing containers with lowest - * priority to preempt. + * Preempt a quantity of resources. Each round, we start from the root queue, + * level-by-level, until choosing a candidate application. + * The policy for prioritizing preemption for each queue depends on its + * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is + * most over its fair share; (2) FIFO, choose the childSchedulable that is + * latest launched. + * Inside each application, we further prioritize preemption by choosing + * containers with lowest priority to preempt. + * We make sure that no queue is placed below its fair share in the process. */ - protected void preemptResources(Collection<FSLeafQueue> scheds, - Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { + protected void preemptResources(Resource toPreempt) { + long start = getClock().getTime(); + if (Resources.equals(toPreempt, Resources.none())) { return; } - Map<RMContainer, FSSchedulerApp> apps = - new HashMap<RMContainer, FSSchedulerApp>(); - Map<RMContainer, FSLeafQueue> queues = - new HashMap<RMContainer, FSLeafQueue>(); - - // Collect running containers from over-scheduled queues - List<RMContainer> runningContainers = new ArrayList<RMContainer>(); - for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as : sched.getRunnableAppSchedulables()) { - for (RMContainer c : as.getApp().getLiveContainers()) { - runningContainers.add(c); - apps.put(c, as.getApp()); - queues.put(c, sched); - } - } - } - } - - // Sort containers into reverse order of priority - Collections.sort(runningContainers, new Comparator<RMContainer>() { - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - }); - // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. Iterator<RMContainer> warnedIter = warnedContainers.iterator(); - Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); - if (container.getState() == RMContainerState.RUNNING && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if ((container.getState() == RMContainerState.RUNNING || + container.getState() == RMContainerState.ALLOCATED) && + Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { - warnOrKillContainer(container, apps.get(container), queues.get(container)); - preemptedThisRound.add(container); + warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } - // Scan down the rest of the containers until we've preempted enough, making - // sure we don't preempt too many from any queue - Iterator<RMContainer> runningIter = runningContainers.iterator(); - while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - toPreempt, Resources.none())) { - RMContainer container = runningIter.next(); - FSLeafQueue sched = queues.get(container); - if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - sched.getResourceUsage(), sched.getFairShare())) { - warnOrKillContainer(container, apps.get(container), sched); - - warnedContainers.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + try { + // Reset preemptedResource for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.resetPreemptedResources(); + } + } + + while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, + toPreempt, Resources.none())) { + RMContainer container = + getQueueManager().getRootQueue().preemptContainer(); + if (container == null) { + break; + } else { + warnOrKillContainer(container); + warnedContainers.add(container); + Resources.subtractFrom( + toPreempt, container.getContainer().getResource()); + } + } + } finally { + // Clear preemptedResources for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (FSAppAttempt app : queue.getRunnableAppSchedulables()) { + app.clearPreemptedResources(); + } } } + + long duration = getClock().getTime() - start; + fsOpDurations.addPreemptCallDuration(duration); } - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, - FSLeafQueue queue) { + protected void warnOrKillContainer(RMContainer container) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSAppAttempt app = getSchedulerApp(appAttemptId); + FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); @@ -461,21 +474,22 @@ public class FairScheduler extends Abstr if (time != null) { // if we asked for preemption more than maxWaitTimeBeforeKill ms ago, // proceed with kill - if (time + waitTimeBeforeKill < clock.getTime()) { + if (time + waitTimeBeforeKill < getClock().getTime()) { ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for premption for " + - (clock.getTime() - time) + "ms)"); + (getClock().getTime() - time) + "ms)"); } } else { - // track the request in the FSSchedulerApp itself - app.addPreemption(container, clock.getTime()); + // track the request in the FSAppAttempt itself + app.addPreemption(container, getClock().getTime()); } } @@ -496,20 +510,20 @@ public class FairScheduler extends Abstr Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, + Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, + if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare @@ -519,12 +533,13 @@ public class FairScheduler extends Abstr return resToPreempt; } - public RMContainerTokenSecretManager getContainerTokenSecretManager() { + public synchronized RMContainerTokenSecretManager + getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } // synchronized for sizeBasedWeight - public synchronized ResourceWeights getAppWeight(AppSchedulable app) { + public synchronized ResourceWeights getAppWeight(FSAppAttempt app) { double weight = 1.0; if (sizeBasedWeight) { // Set weight based on current memory demand @@ -540,18 +555,12 @@ public class FairScheduler extends Abstr return resourceWeights; } - @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - public Resource getIncrementResourceCapability() { return incrAllocation; } - @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; + private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); } public double getNodeLocalityThreshold() { @@ -578,10 +587,6 @@ public class FairScheduler extends Abstr return continuousSchedulingSleepMs; } - public Resource getClusterCapacity() { - return clusterCapacity; - } - public synchronized Clock getClock() { return clock; } @@ -600,7 +605,7 @@ public class FairScheduler extends Abstr * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -629,16 +634,22 @@ public class FairScheduler extends Abstr return; } - SchedulerApplication application = - new SchedulerApplication(queue, user); + SchedulerApplication<FSAppAttempt> application = + new SchedulerApplication<FSAppAttempt>(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } /** @@ -646,19 +657,20 @@ public class FairScheduler extends Abstr */ protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, - boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + boolean transferStateFromPreviousAttempt, + boolean isAttemptRecovering) { + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); FSLeafQueue queue = (FSLeafQueue) application.getQueue(); - FSSchedulerApp attempt = - new FSSchedulerApp(applicationAttemptId, user, + FSAppAttempt attempt = + new FSAppAttempt(this, applicationAttemptId, user, queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt(application - .getCurrentAppAttempt()); + .getCurrentAppAttempt()); } application.setCurrentAppAttempt(attempt); @@ -674,9 +686,17 @@ public class FairScheduler extends Abstr LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( + + if (isAttemptRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); + } + } else { + rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); + } } /** @@ -720,7 +740,8 @@ public class FairScheduler extends Abstr private synchronized void removeApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication<FSAppAttempt> application = + applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); return; @@ -734,9 +755,9 @@ public class FairScheduler extends Abstr RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) { LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApplication application = + SchedulerApplication<FSAppAttempt> application = applications.get(applicationAttemptId.getApplicationId()); - FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId); + FSAppAttempt attempt = getSchedulerApp(applicationAttemptId); if (attempt == null || application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -787,7 +808,8 @@ public class FairScheduler extends Abstr /** * Clean up a completed container. */ - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -797,7 +819,7 @@ public class FairScheduler extends Abstr Container container = rmContainer.getContainer(); // Get the application for the finished container - FSSchedulerApp application = + FSAppAttempt application = getCurrentAttemptForContainer(container.getId()); ApplicationId appId = container.getId().getApplicationAttemptId().getApplicationId(); @@ -809,11 +831,10 @@ public class FairScheduler extends Abstr } // Get the node on which the container was allocated - FSSchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = getFSSchedulerNode(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { - application.unreserve(node, rmContainer.getReservedPriority()); - node.unreserveResource(application); + application.unreserve(rmContainer.getReservedPriority(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); @@ -827,20 +848,20 @@ public class FairScheduler extends Abstr private synchronized void addNode(RMNode node) { nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); - Resources.addTo(clusterCapacity, node.getTotalCapability()); + Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); LOG.info("Added node " + node.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID()); // This can occur when an UNHEALTHY node reconnects if (node == null) { return; } - Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); + Resources.subtractFrom(clusterResource, rmNode.getTotalCapability()); updateRootQueueMetrics(); // Remove running containers @@ -865,7 +886,7 @@ public class FairScheduler extends Abstr nodes.remove(rmNode.getNodeID()); LOG.info("Removed node " + rmNode.getNodeAddress() + - " cluster capacity: " + clusterCapacity); + " cluster capacity: " + clusterResource); } @Override @@ -873,7 +894,7 @@ public class FairScheduler extends Abstr List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) { // Make sure this application exists - FSSchedulerApp application = getSchedulerApp(appAttemptId); + FSAppAttempt application = getSchedulerApp(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -882,25 +903,17 @@ public class FairScheduler extends Abstr // Sanity check SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), - clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation); + clusterResource, minimumAllocation, maximumAllocation, incrAllocation); - // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); + // Set amResource for this app + if (!application.getUnmanagedAM() && ask.size() == 1 + && application.getLiveContainers().isEmpty()) { + application.setAMResource(ask.get(0).getCapability()); } + // Release containers + releaseContainers(release, application); + synchronized (application) { if (!ask.isEmpty()) { if (LOG.isDebugEnabled()) { @@ -913,14 +926,14 @@ public class FairScheduler extends Abstr // Update application requests application.updateResourceRequests(ask); - LOG.debug("allocate: post-update"); application.showRequests(); } if (LOG.isDebugEnabled()) { - LOG.debug("allocate:" + + LOG.debug("allocate: post-update" + " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size()); + " #ask=" + ask.size() + + " reservation= " + application.getCurrentReservation()); LOG.debug("Preempting " + application.getPreemptionContainers().size() + " container(s)"); @@ -941,33 +954,18 @@ public class FairScheduler extends Abstr } /** - * Process a container which has launched on a node, as reported by the node. - */ - private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { - // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - - /** * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { + long start = getClock().getTime(); if (LOG.isDebugEnabled()) { - LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); + LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); + SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); @@ -996,41 +994,38 @@ public class FairScheduler extends Abstr } else { attemptScheduling(node); } + + long duration = getClock().getTime() - start; + fsOpDurations.addNodeUpdateDuration(duration); } - private void continuousScheduling() { - while (true) { - List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } - - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = nodes.get(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } + void continuousSchedulingAttempt() throws InterruptedException { + long start = getClock().getTime(); + List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } + + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); + if (node != null && Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.error("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } } + + long duration = getClock().getTime() - start; + fsOpDurations.addContinuousSchedulingRunDuration(duration); } /** Sort nodes by available resource */ @@ -1038,7 +1033,13 @@ public class FairScheduler extends Abstr @Override public int compare(NodeId n1, NodeId n2) { - return RESOURCE_CALCULATOR.compare(clusterCapacity, + if (!nodes.containsKey(n1)) { + return 1; + } + if (!nodes.containsKey(n2)) { + return -1; + } + return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); } @@ -1049,13 +1050,13 @@ public class FairScheduler extends Abstr // 1. Check for reserved applications // 2. Schedule if there are no reservations - AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node " + node); reservedAppSchedulable.unreserve(reservedPriority, node); reservedAppSchedulable = null; @@ -1063,7 +1064,7 @@ public class FairScheduler extends Abstr // Reservation exists; try to fulfill the reservation if (LOG.isDebugEnabled()) { LOG.debug("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApp().getApplicationAttemptId() + + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } @@ -1075,9 +1076,8 @@ public class FairScheduler extends Abstr int assignedContainers = 0; while (node.getReservedContainer() == null) { boolean assignedContainer = false; - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, - queueMgr.getRootQueue().assignContainer(node), - Resources.none())) { + if (!queueMgr.getRootQueue().assignContainer(node).equals( + Resources.none())) { assignedContainers++; assignedContainer = true; } @@ -1089,45 +1089,8 @@ public class FairScheduler extends Abstr updateRootQueueMetrics(); } - @Override - public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FSSchedulerNode node = nodes.get(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) { - SchedulerApplication app = - applications.get(appAttemptId.getApplicationId()); - if (app != null) { - return (FSSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return new SchedulerAppReport(attempt); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId appAttemptId) { - FSSchedulerApp attempt = getSchedulerApp(appAttemptId); - if (attempt == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); - } - return null; - } - return attempt.getResourceUsageReport(); + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { + return super.getApplicationAttempt(appAttemptId); } /** @@ -1139,7 +1102,23 @@ public class FairScheduler extends Abstr private void updateRootQueueMetrics() { rootMetrics.setAvailableResourcesToQueue( Resources.subtract( - clusterCapacity, rootMetrics.getAllocatedResources())); + clusterResource, rootMetrics.getAllocatedResources())); + } + + /** + * Check if preemption is enabled and the utilization threshold for + * preemption is met. + * + * @return true if preemption should be attempted, false otherwise. + */ + private boolean shouldAttemptPreemption() { + if (preemptionEnabled) { + return (preemptionUtilizationThreshold < Math.max( + (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), + (float) rootMetrics.getAllocatedVirtualCores() / + clusterResource.getVirtualCores())); + } + return false; } @Override @@ -1156,6 +1135,8 @@ public class FairScheduler extends Abstr } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { @@ -1177,7 +1158,8 @@ public class FairScheduler extends Abstr } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1194,7 +1176,8 @@ public class FairScheduler extends Abstr AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getIsAttemptRecovering()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { @@ -1230,85 +1213,134 @@ public class FairScheduler extends Abstr // NOT IMPLEMENTED } - @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + private synchronized void initScheduler(Configuration conf) throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap<ApplicationId, SchedulerApplication>(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + preemptionUtilizationThreshold = + this.conf.getPreemptionUtilizationThreshold(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + updateInterval = this.conf.getUpdateInterval(); + if (updateInterval < 0) { + updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; + LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS + + " is invalid, so using default value " + + + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + + " ms instead"); + } + + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + fsOpDurations = FSOpDurations.getInstance(true); + + // This stores per-application scheduling information + this.applications = new ConcurrentHashMap< + ApplicationId, SchedulerApplication<FSAppAttempt>>(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); - initialized = true; + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + updateThread = new UpdateThread(); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + schedulingThread = new ContinuousSchedulingThread(); + schedulingThread.setName("FairSchedulerContinuousScheduling"); + schedulingThread.setDaemon(true); + } - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( - new Runnable() { - @Override - public void run() { - continuousScheduling(); - } - } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + } + + private synchronized void startSchedulerThreads() { + Preconditions.checkNotNull(updateThread, "updateThread is null"); + Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); + updateThread.start(); + if (continuousSchedulingEnabled) { + Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); + schedulingThread.start(); + } + allocsLoader.start(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + synchronized (this) { + if (updateThread != null) { + updateThread.interrupt(); + updateThread.join(THREAD_JOIN_TIMEOUT_MS); } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); + if (continuousSchedulingEnabled) { + if (schedulingThread != null) { + schedulingThread.interrupt(); + schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); + } } - allocsLoader.start(); - } else { - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - LOG.error("Failed to reload allocations file", e); + if (allocsLoader != null) { + allocsLoader.stop(); } } + + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } } @Override @@ -1323,7 +1355,7 @@ public class FairScheduler extends Abstr @Override public List<QueueUserACLInfo> getQueueUserAclInfo() { - UserGroupInformation user = null; + UserGroupInformation user; try { user = UserGroupInformation.getCurrentUser(); } catch (IOException ioe) { @@ -1365,7 +1397,7 @@ public class FairScheduler extends Abstr // if it does not already exist, so it can be displayed on the web UI. synchronized (FairScheduler.this) { allocConf = queueInfo; - allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity); + allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); } } @@ -1385,11 +1417,11 @@ public class FairScheduler extends Abstr @Override public synchronized String moveApplication(ApplicationId appId, String queueName) throws YarnException { - SchedulerApplication app = applications.get(appId); + SchedulerApplication<FSAppAttempt> app = applications.get(appId); if (app == null) { throw new YarnException("App to be moved " + appId + " not found."); } - FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt(); + FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt(); // To serialize with FairScheduler#allocate, synchronize on app attempt synchronized (attempt) { FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue(); @@ -1402,8 +1434,7 @@ public class FairScheduler extends Abstr return oldQueue.getQueueName(); } - if (oldQueue.getRunnableAppSchedulables().contains( - attempt.getAppSchedulable())) { + if (oldQueue.getRunnableAppSchedulables().contains(attempt)) { verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue); } @@ -1412,7 +1443,7 @@ public class FairScheduler extends Abstr } } - private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app, + private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app, FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException { String queueName = targetQueue.getQueueName(); ApplicationAttemptId appAttId = app.getApplicationAttemptId(); @@ -1449,8 +1480,8 @@ public class FairScheduler extends Abstr * Helper for moveApplication, which has appropriate synchronization, so all * operations will be atomic. */ - private void executeMove(SchedulerApplication app, FSSchedulerApp attempt, - FSLeafQueue oldQueue, FSLeafQueue newQueue) { + private void executeMove(SchedulerApplication<FSAppAttempt> app, + FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) { boolean wasRunnable = oldQueue.removeApp(attempt); // if app was not runnable before, it may be runnable now boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue, @@ -1478,8 +1509,9 @@ public class FairScheduler extends Abstr maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue); } } - - private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) { + + @VisibleForTesting + FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) { // Because queue names include ancestors, separated by periods, we can find // the lowest common ancestors by going from the start of the names until // there's a character that doesn't match. @@ -1491,7 +1523,7 @@ public class FairScheduler extends Abstr for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) { if (name1.length() <= i || name2.length() <= i || name1.charAt(i) != name2.charAt(i)) { - return queueMgr.getQueue(name1.substring(lastPeriodIndex)); + return queueMgr.getQueue(name1.substring(0, lastPeriodIndex)); } else if (name1.charAt(i) == '.') { lastPeriodIndex = i; }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Tue Aug 19 23:49:39 2014 @@ -101,6 +101,10 @@ public class FairSchedulerConfiguration /** Whether preemption is enabled. */ protected static final String PREEMPTION = CONF_PREFIX + "preemption"; protected static final boolean DEFAULT_PREEMPTION = false; + + protected static final String PREEMPTION_THRESHOLD = + CONF_PREFIX + "preemption.cluster-utilization-threshold"; + protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; @@ -119,6 +123,11 @@ public class FairSchedulerConfiguration protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final int DEFAULT_MAX_ASSIGN = -1; + /** The update interval for calculating resources in FairScheduler .*/ + public static final String UPDATE_INTERVAL_MS = + CONF_PREFIX + "update-interval-ms"; + public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; + public FairSchedulerConfiguration() { super(); } @@ -185,6 +194,10 @@ public class FairSchedulerConfiguration return getBoolean(PREEMPTION, DEFAULT_PREEMPTION); } + public float getPreemptionUtilizationThreshold() { + return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); + } + public boolean getAssignMultiple() { return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); } @@ -238,6 +251,10 @@ public class FairSchedulerConfiguration "Error reading resource config", ex); } } + + public long getUpdateInterval() { + return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS); + } private static int findResource(String val, String units) throws AllocationConfigurationException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Tue Aug 19 23:49:39 2014 @@ -25,15 +25,15 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability.Unstable; /** - * Order {@link AppSchedulable} objects by priority and then by submit time, as + * Order {@link FSAppAttempt} objects by priority and then by submit time, as * in the default scheduler in Hadoop. */ @Private @Unstable -public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable { +public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable { private static final long serialVersionUID = 3428835083489547918L; - public int compare(AppSchedulable a1, AppSchedulable a2) { + public int compare(FSAppAttempt a1, FSAppAttempt a2) { int res = a1.getPriority().compareTo(a2.getPriority()); if (res == 0) { if (a1.getStartTime() < a2.getStartTime()) { @@ -44,7 +44,7 @@ public class FifoAppComparator implement } if (res == 0) { // If there is a tie, break it by app ID to get a deterministic order - res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId()); + res = a1.getApplicationId().compareTo(a2.getApplicationId()); } return res; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Tue Aug 19 23:49:39 2014 @@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer { // Tracks the number of running applications by user. private final Map<String, Integer> usersNumRunnableApps; @VisibleForTesting - final ListMultimap<String, AppSchedulable> usersNonRunnableApps; + final ListMultimap<String, FSAppAttempt> usersNonRunnableApps; public MaxRunningAppsEnforcer(FairScheduler scheduler) { this.scheduler = scheduler; @@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer { * Tracks the given new runnable app for purposes of maintaining max running * app limits. */ - public void trackRunnableApp(FSSchedulerApp app) { + public void trackRunnableApp(FSAppAttempt app) { String user = app.getUser(); FSLeafQueue queue = app.getQueue(); // Increment running counts for all parent queues @@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer { * Tracks the given new non runnable app so that it can be made runnable when * it would not violate max running app limits. */ - public void trackNonRunnableApp(FSSchedulerApp app) { + public void trackNonRunnableApp(FSAppAttempt app) { String user = app.getUser(); - usersNonRunnableApps.put(user, app.getAppSchedulable()); + usersNonRunnableApps.put(user, app); } /** @@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer { * Runs in O(n log(n)) where n is the number of queues that are under the * highest queue that went from having no slack to having slack. */ - public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) { + public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); // childqueueX might have no pending apps itself, but if a queue higher up @@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer { parent = parent.getParent(); } - List<List<AppSchedulable>> appsNowMaybeRunnable = - new ArrayList<List<AppSchedulable>>(); + List<List<FSAppAttempt>> appsNowMaybeRunnable = + new ArrayList<List<FSAppAttempt>>(); // Compile lists of apps which may now be runnable // We gather lists instead of building a set of all non-runnable apps so @@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer { userNumRunning = 0; } if (userNumRunning == allocConf.getUserMaxApps(user) - 1) { - List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user); + List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user); if (userWaitingApps != null) { appsNowMaybeRunnable.add(userWaitingApps); } } // Scan through and check whether this means that any apps are now runnable - Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator( + Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); - FSSchedulerApp prev = null; - List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>(); + FSAppAttempt prev = null; + List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>(); while (iter.hasNext()) { - FSSchedulerApp next = iter.next(); + FSAppAttempt next = iter.next(); if (next == prev) { continue; } if (canAppBeRunnable(next.getQueue(), next.getUser())) { trackRunnableApp(next); - AppSchedulable appSched = next.getAppSchedulable(); + FSAppAttempt appSched = next; next.getQueue().getRunnableAppSchedulables().add(appSched); noLongerPendingApps.add(appSched); @@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer { // We remove the apps from their pending lists afterwards so that we don't // pull them out from under the iterator. If they are not in these lists // in the first place, there is a bug. - for (AppSchedulable appSched : noLongerPendingApps) { - if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables() + for (FSAppAttempt appSched : noLongerPendingApps) { + if (!appSched.getQueue().getNonRunnableAppSchedulables() .remove(appSched)) { LOG.error("Can't make app runnable that does not already exist in queue" + " as non-runnable: " + appSched + ". This should never happen."); } - if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) { + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " + "usersNonRunnableApps, but was not. This should never happen."); } @@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer { * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. */ - public void untrackRunnableApp(FSSchedulerApp app) { + public void untrackRunnableApp(FSAppAttempt app) { // Update usersRunnableApps String user = app.getUser(); int newUserNumRunning = usersNumRunnableApps.get(user) - 1; @@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer { /** * Stops tracking the given non-runnable app */ - public void untrackNonRunnableApp(FSSchedulerApp app) { - usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable()); + public void untrackNonRunnableApp(FSAppAttempt app) { + usersNonRunnableApps.remove(app.getUser(), app); } /** @@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer { * of non-runnable applications. */ private void gatherPossiblyRunnableAppLists(FSQueue queue, - List<List<AppSchedulable>> appLists) { + List<List<FSAppAttempt>> appLists) { if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration() .getQueueMaxApps(queue.getName())) { if (queue instanceof FSLeafQueue) { @@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer { * of O(num lists) time. */ static class MultiListStartTimeIterator implements - Iterator<FSSchedulerApp> { + Iterator<FSAppAttempt> { - private List<AppSchedulable>[] appLists; + private List<FSAppAttempt>[] appLists; private int[] curPositionsInAppLists; private PriorityQueue<IndexAndTime> appListsByCurStartTime; @SuppressWarnings("unchecked") - public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) { + public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) { appLists = appListList.toArray(new List[appListList.size()]); curPositionsInAppLists = new int[appLists.length]; appListsByCurStartTime = new PriorityQueue<IndexAndTime>(); @@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer { } @Override - public FSSchedulerApp next() { + public FSAppAttempt next() { IndexAndTime indexAndTime = appListsByCurStartTime.remove(); int nextListIndex = indexAndTime.index; - AppSchedulable next = appLists[nextListIndex] + FSAppAttempt next = appLists[nextListIndex] .get(curPositionsInAppLists[nextListIndex]); curPositionsInAppLists[nextListIndex]++; @@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer { } appListsByCurStartTime.add(indexAndTime); - return next.getApp(); + return next; } @Override Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewAppWeightBooster.java Tue Aug 19 23:49:39 2014 @@ -48,7 +48,7 @@ public class NewAppWeightBooster extends super.setConf(conf); } - public double adjustWeight(AppSchedulable app, double curWeight) { + public double adjustWeight(FSAppAttempt app, double curWeight) { long start = app.getStartTime(); long now = System.currentTimeMillis(); if (now - start < duration) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Aug 19 23:49:39 2014 @@ -74,7 +74,7 @@ public class QueueManager { } /** - * Get a queue by name, creating it if the create param is true and is necessary. + * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a * parent queue, or one of the parents in its name is already a leaf queue, * null is returned. @@ -85,31 +85,53 @@ public class QueueManager { * could be referred to as just "parent1.queue2". */ public FSLeafQueue getLeafQueue(String name, boolean create) { + FSQueue queue = getQueue(name, create, FSQueueType.LEAF); + if (queue instanceof FSParentQueue) { + return null; + } + return (FSLeafQueue) queue; + } + + /** + * Get a parent queue by name, creating it if the create param is true and is necessary. + * If the queue is not or can not be a parent queue, i.e. it already exists as a + * leaf queue, or one of the parents in its name is already a leaf queue, + * null is returned. + * + * The root part of the name is optional, so a queue underneath the root + * named "queue1" could be referred to as just "queue1", and a queue named + * "queue2" underneath a parent named "parent1" that is underneath the root + * could be referred to as just "parent1.queue2". + */ + public FSParentQueue getParentQueue(String name, boolean create) { + FSQueue queue = getQueue(name, create, FSQueueType.PARENT); + if (queue instanceof FSLeafQueue) { + return null; + } + return (FSParentQueue) queue; + } + + private FSQueue getQueue(String name, boolean create, FSQueueType queueType) { name = ensureRootPrefix(name); synchronized (queues) { FSQueue queue = queues.get(name); if (queue == null && create) { - FSLeafQueue leafQueue = createLeafQueue(name); - if (leafQueue == null) { - return null; - } - queue = leafQueue; - } else if (queue instanceof FSParentQueue) { - return null; + // if the queue doesn't exist,create it and return + queue = createQueue(name, queueType); } - return (FSLeafQueue)queue; + return queue; } } /** - * Creates a leaf queue and places it in the tree. Creates any - * parents that don't already exist. + * Creates a leaf or parent queue based on what is specified in 'queueType' + * and places it in the tree. Creates any parents that don't already exist. * * @return * the created queue, if successful. null if not allowed (one of the parent * queues in the queue name is already a leaf queue) */ - private FSLeafQueue createLeafQueue(String name) { + private FSQueue createQueue(String name, FSQueueType queueType) { List<String> newQueueNames = new ArrayList<String>(); newQueueNames.add(name); int sepIndex = name.length(); @@ -143,8 +165,7 @@ public class QueueManager { FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); - if (i == 0) { - // First name added was the leaf queue + if (i == 0 && queueType != FSQueueType.PARENT) { leafQueue = new FSLeafQueue(name, scheduler, parent); try { leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); @@ -155,6 +176,7 @@ public class QueueManager { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); + return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); try { @@ -169,53 +191,64 @@ public class QueueManager { } } - return leafQueue; + return parent; } /** - * Make way for the given leaf queue if possible, by removing incompatible + * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to - * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in - * the ancestry of leafToCreate. + * (1) queueToCreate being currently a parent but needs to change to leaf + * (2) queueToCreate being currently a leaf but needs to change to parent + * (3) an existing leaf queue in the ancestry of queueToCreate. * * We will never remove the root queue or the default queue in this way. * - * @return true if we can create leafToCreate or it already exists. + * @return true if we can create queueToCreate or it already exists. */ - private boolean removeEmptyIncompatibleQueues(String leafToCreate) { - leafToCreate = ensureRootPrefix(leafToCreate); + private boolean removeEmptyIncompatibleQueues(String queueToCreate, + FSQueueType queueType) { + queueToCreate = ensureRootPrefix(queueToCreate); - // Ensure leafToCreate is not root and doesn't have the default queue in its + // Ensure queueToCreate is not root and doesn't have the default queue in its // ancestry. - if (leafToCreate.equals(ROOT_QUEUE) || - leafToCreate.startsWith( + if (queueToCreate.equals(ROOT_QUEUE) || + queueToCreate.startsWith( ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { return false; } - FSQueue queue = queues.get(leafToCreate); + FSQueue queue = queues.get(queueToCreate); // Queue exists already. if (queue != null) { if (queue instanceof FSLeafQueue) { - // If it's an already existing leaf, we're ok. - return true; + if (queueType == FSQueueType.LEAF) { + // if queue is already a leaf then return true + return true; + } + // remove incompatibility since queue is a leaf currently + // needs to change to a parent. + return removeQueueIfEmpty(queue); } else { - // If it's an existing parent queue, remove it if it's empty. + if (queueType == FSQueueType.PARENT) { + return true; + } + // If it's an existing parent queue and needs to change to leaf, + // remove it if it's empty. return removeQueueIfEmpty(queue); } } // Queue doesn't exist already. Check if the new queue would be created // under an existing leaf queue. If so, try removing that leaf queue. - int sepIndex = leafToCreate.length(); - sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); + int sepIndex = queueToCreate.length(); + sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); while (sepIndex != -1) { - String prefixString = leafToCreate.substring(0, sepIndex); + String prefixString = queueToCreate.substring(0, sepIndex); FSQueue prefixQueue = queues.get(prefixString); if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { return removeQueueIfEmpty(prefixQueue); } - sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); + sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); } return true; } @@ -312,12 +345,21 @@ public class QueueManager { } public void updateAllocationConfiguration(AllocationConfiguration queueConf) { - // Make sure all queues exist - for (String name : queueConf.getQueueNames()) { - if (removeEmptyIncompatibleQueues(name)) { + // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist + for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { getLeafQueue(name, true); } } + + // At this point all leaves and 'parents with at least one child' would have been created. + // Now create parents with no configured leaf. + for (String name : queueConf.getConfiguredQueues().get( + FSQueueType.PARENT)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { + getParentQueue(name, true); + } + } for (FSQueue queue : queues.values()) { // Update queue metrics @@ -327,7 +369,7 @@ public class QueueManager { // Set scheduling policies try { SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName()); - policy.initialize(scheduler.getClusterCapacity()); + policy.initialize(scheduler.getClusterResource()); queue.setPolicy(policy); } catch (AllocationConfigurationException ex) { LOG.warn("Cannot apply configured scheduling policy to queue " Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Groups; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +34,8 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +@Private +@Unstable public class QueuePlacementPolicy { private static final Map<String, Class<? extends QueuePlacementRule>> ruleClasses; static { @@ -42,17 +46,19 @@ public class QueuePlacementPolicy { map.put("secondaryGroupExistingQueue", QueuePlacementRule.SecondaryGroupExistingQueue.class); map.put("specified", QueuePlacementRule.Specified.class); + map.put("nestedUserQueue", + QueuePlacementRule.NestedUserQueue.class); map.put("default", QueuePlacementRule.Default.class); map.put("reject", QueuePlacementRule.Reject.class); ruleClasses = Collections.unmodifiableMap(map); } private final List<QueuePlacementRule> rules; - private final Set<String> configuredQueues; + private final Map<FSQueueType, Set<String>> configuredQueues; private final Groups groups; public QueuePlacementPolicy(List<QueuePlacementRule> rules, - Set<String> configuredQueues, Configuration conf) + Map<FSQueueType, Set<String>> configuredQueues, Configuration conf) throws AllocationConfigurationException { for (int i = 0; i < rules.size()-1; i++) { if (rules.get(i).isTerminal()) { @@ -72,28 +78,15 @@ public class QueuePlacementPolicy { /** * Builds a QueuePlacementPolicy from an xml element. */ - public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues, - Configuration conf) throws AllocationConfigurationException { + public static QueuePlacementPolicy fromXml(Element el, + Map<FSQueueType, Set<String>> configuredQueues, Configuration conf) + throws AllocationConfigurationException { List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); NodeList elements = el.getChildNodes(); for (int i = 0; i < elements.getLength(); i++) { Node node = elements.item(i); if (node instanceof Element) { - Element element = (Element)node; - - String ruleName = element.getAttribute("name"); - if ("".equals(ruleName)) { - throw new AllocationConfigurationException("No name provided for a " + - "rule element"); - } - - Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName); - if (clazz == null) { - throw new AllocationConfigurationException("No rule class found for " - + ruleName); - } - QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); - rule.initializeFromXml(element); + QueuePlacementRule rule = createAndInitializeRule(node); rules.add(rule); } } @@ -101,11 +94,37 @@ public class QueuePlacementPolicy { } /** + * Create and initialize a rule given a xml node + * @param node + * @return QueuePlacementPolicy + * @throws AllocationConfigurationException + */ + public static QueuePlacementRule createAndInitializeRule(Node node) + throws AllocationConfigurationException { + Element element = (Element) node; + + String ruleName = element.getAttribute("name"); + if ("".equals(ruleName)) { + throw new AllocationConfigurationException("No name provided for a " + + "rule element"); + } + + Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName); + if (clazz == null) { + throw new AllocationConfigurationException("No rule class found for " + + ruleName); + } + QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); + rule.initializeFromXml(element); + return rule; + } + + /** * Build a simple queue placement policy from the allow-undeclared-pools and * user-as-default-queue configuration options. */ public static QueuePlacementPolicy fromConfiguration(Configuration conf, - Set<String> configuredQueues) { + Map<FSQueueType, Set<String>> configuredQueues) { boolean create = conf.getBoolean( FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
