Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Tue Aug 19 23:49:39 2014 @@ -18,34 +18,101 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; -public abstract class AbstractYarnScheduler implements ResourceScheduler { +import com.google.common.util.concurrent.SettableFuture; + + +@SuppressWarnings("unchecked") +public abstract class AbstractYarnScheduler + <T extends SchedulerApplicationAttempt, N extends SchedulerNode> + extends AbstractService implements ResourceScheduler { + + private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + + // Nodes in the cluster, indexed by NodeId + protected Map<NodeId, N> nodes = new ConcurrentHashMap<NodeId, N>(); + + // Whole capacity of the cluster + protected Resource clusterResource = Resource.newInstance(0, 0); + + protected Resource minimumAllocation; + protected Resource maximumAllocation; protected RMContext rmContext; - protected Map<ApplicationId, SchedulerApplication> applications; + protected Map<ApplicationId, SchedulerApplication<T>> applications; + protected int nmExpireInterval; + protected final static List<Container> EMPTY_CONTAINER_LIST = new ArrayList<Container>(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + /** + * Construct the service. + * + * @param name service name + */ + public AbstractYarnScheduler(String name) { + super(name); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + createReleaseCache(); + super.serviceInit(conf); + } + public synchronized List<Container> getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); - SchedulerApplication app = applications.get(appId); + SchedulerApplication<T> app = applications.get(appId); List<Container> containerList = new ArrayList<Container>(); RMApp appImpl = this.rmContext.getRMApps().get(appId); if (appImpl.getApplicationSubmissionContext().getUnmanagedAM()) { @@ -64,14 +131,333 @@ public abstract class AbstractYarnSchedu return containerList; } - public Map<ApplicationId, SchedulerApplication> getSchedulerApplications() { + public Map<ApplicationId, SchedulerApplication<T>> + getSchedulerApplications() { return applications; } - + + @Override + public Resource getClusterResource() { + return clusterResource; + } + + @Override + public Resource getMinimumResourceCapability() { + return minimumAllocation; + } + + @Override + public Resource getMaximumResourceCapability() { + return maximumAllocation; + } + + protected void containerLaunchedOnNode(ContainerId containerId, + SchedulerNode node) { + // Get the application for the finished container + SchedulerApplicationAttempt application = getCurrentAttemptForContainer + (containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } + + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } + + public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { + SchedulerApplication<T> app = + applications.get(applicationAttemptId.getApplicationId()); + return app == null ? null : app.getCurrentAppAttempt(); + } + + @Override + public SchedulerAppReport getSchedulerAppInfo( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return new SchedulerAppReport(attempt); + } + + @Override + public ApplicationResourceUsageReport getAppResourceUsageReport( + ApplicationAttemptId appAttemptId) { + SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId); + if (attempt == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Request for appInfo of unknown attempt " + appAttemptId); + } + return null; + } + return attempt.getResourceUsageReport(); + } + + public T getCurrentAttemptForContainer(ContainerId containerId) { + return getApplicationAttempt(containerId.getApplicationAttemptId()); + } + + @Override + public RMContainer getRMContainer(ContainerId containerId) { + SchedulerApplicationAttempt attempt = + getCurrentAttemptForContainer(containerId); + return (attempt == null) ? null : attempt.getRMContainer(containerId); + } + + @Override + public SchedulerNodeReport getNodeReport(NodeId nodeId) { + N node = nodes.get(nodeId); + return node == null ? null : new SchedulerNodeReport(node); + } + @Override public String moveApplication(ApplicationId appId, String newQueue) throws YarnException { throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } + + private void killOrphanContainerOnNode(RMNode node, + NMContainerStatus container) { + if (!container.getContainerState().equals(ContainerState.COMPLETE)) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(node.getNodeID(), + container.getContainerId())); + } + } + + public synchronized void recoverContainersOnNode( + List<NMContainerStatus> containerReports, RMNode nm) { + if (!rmContext.isWorkPreservingRecoveryEnabled() + || containerReports == null + || (containerReports != null && containerReports.isEmpty())) { + return; + } + + for (NMContainerStatus container : containerReports) { + ApplicationId appId = + container.getContainerId().getApplicationAttemptId().getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + container + + " for unknown application."); + killOrphanContainerOnNode(nm, container); + continue; + } + + // Unmanaged AM recovery is addressed in YARN-1815 + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + LOG.info("Skip recovering container " + container + " for unmanaged AM." + + rmApp.getApplicationId()); + killOrphanContainerOnNode(nm, container); + continue; + } + + SchedulerApplication<T> schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + container + + " for unknown SchedulerApplication. Application current state is " + + rmApp.getState()); + killOrphanContainerOnNode(nm, container); + continue; + } + + LOG.info("Recovering container " + container); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + // create container + RMContainer rmContainer = recoverAndCreateContainer(container, nm); + + // recover RMContainer + rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), + container)); + + // recover scheduler node + nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(rmContainer); + + // set master container for the current running AMContainer for this + // attempt. + RMAppAttempt appAttempt = rmApp.getCurrentAppAttempt(); + if (appAttempt != null) { + Container masterContainer = appAttempt.getMasterContainer(); + + // Mark current running AMContainer's RMContainer based on the master + // container ID stored in AppAttempt. + if (masterContainer != null + && masterContainer.getId().equals(rmContainer.getContainerId())) { + ((RMContainerImpl)rmContainer).setAMContainer(true); + } + } + + synchronized (schedulerAttempt) { + Set<ContainerId> releases = schedulerAttempt.getPendingRelease(); + if (releases.contains(container.getContainerId())) { + // release the container + rmContainer.handle(new RMContainerFinishedEvent(container + .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); + releases.remove(container.getContainerId()); + LOG.info(container.getContainerId() + " is released by application."); + } + } + } + } + + private RMContainer recoverAndCreateContainer(NMContainerStatus status, + RMNode node) { + Container container = + Container.newInstance(status.getContainerId(), node.getNodeID(), + node.getHttpAddress(), status.getAllocatedResource(), + status.getPriority(), null); + ApplicationAttemptId attemptId = + container.getId().getApplicationAttemptId(); + RMContainer rmContainer = + new RMContainerImpl(container, attemptId, node.getNodeID(), + applications.get(attemptId.getApplicationId()).getUser(), rmContext, + status.getCreationTime()); + return rmContainer; + } + + /** + * Recover resource request back from RMContainer when a container is + * preempted before AM pulled the same. If container is pulled by + * AM, then RMContainer will not have resource request to recover. + * @param rmContainer + */ + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + + // If container state is moved to ACQUIRED, request will be empty. + if (requests == null) { + return; + } + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); + if (schedulerAttempt != null) { + schedulerAttempt.recoverResourceRequests(requests); + } + } + + protected void createReleaseCache() { + // Cleanup the cache after nm expire interval. + new Timer().schedule(new TimerTask() { + @Override + public void run() { + for (SchedulerApplication<T> app : applications.values()) { + + T attempt = app.getCurrentAppAttempt(); + synchronized (attempt) { + for (ContainerId containerId : attempt.getPendingRelease()) { + RMAuditLogger.logFailure( + app.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", + "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + attempt.getPendingRelease().clear(); + } + } + LOG.info("Release request cache is cleaned up"); + } + }, nmExpireInterval); + } + + // clean up a completed container + protected abstract void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event); + + protected void releaseContainers(List<ContainerId> containers, + SchedulerApplicationAttempt attempt) { + for (ContainerId containerId : containers) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + < nmExpireInterval) { + LOG.info(containerId + " doesn't exist. Add the container" + + " to the release request cache as it maybe on recovery."); + synchronized (attempt) { + attempt.getPendingRelease().add(containerId); + } + } else { + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + } + } + + public SchedulerNode getSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + @Override + public synchronized void moveAllApps(String sourceQueue, String destQueue) + throws YarnException { + // check if destination queue is a valid leaf queue + try { + getQueueInfo(destQueue, false, false); + } catch (IOException e) { + LOG.warn(e); + throw new YarnException(e); + } + // check if source queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue); + if (apps == null) { + String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate move events for each pending/running app + for (ApplicationAttemptId app : apps) { + SettableFuture<Object> future = SettableFuture.create(); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + } + + @Override + public synchronized void killAllAppsInQueue(String queueName) + throws YarnException { + // check if queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(queueName); + if (apps == null) { + String errMsg = "The specified Queue: " + queueName + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate kill events for each pending/running app + for (ApplicationAttemptId app : apps) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); + } + } }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Tue Aug 19 23:49:39 2014 @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -52,10 +54,13 @@ public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private final ApplicationAttemptId applicationAttemptId; final ApplicationId applicationId; - private final String queueName; + private String queueName; Queue queue; final String user; - private final AtomicInteger containerIdCounter = new AtomicInteger(0); + // TODO making containerIdCounter long + private final AtomicInteger containerIdCounter; + private final int EPOCH_BIT_MASK = 0x3ff; + private final int EPOCH_BIT_SHIFT = 22; final Set<Priority> priorities = new TreeSet<Priority>( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -68,15 +73,19 @@ public class AppSchedulingInfo { /* Allocated by scheduler */ boolean pending = true; // for app metrics - + + public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager) { + String user, Queue queue, ActiveUsersManager activeUsersManager, + int epoch) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; + this.containerIdCounter = new AtomicInteger( + (epoch & EPOCH_BIT_MASK) << EPOCH_BIT_SHIFT); } public ApplicationId getApplicationId() { @@ -118,9 +127,10 @@ public class AppSchedulingInfo { * by the application. * * @param requests resources to be acquired + * @param recoverPreemptedRequest recover Resource Request on preemption */ synchronized public void updateResourceRequests( - List<ResourceRequest> requests) { + List<ResourceRequest> requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -154,8 +164,13 @@ public class AppSchedulingInfo { asks = new HashMap<String, ResourceRequest>(); this.requests.put(priority, asks); this.priorities.add(priority); - } else if (updatePendingResources) { - lastRequest = asks.get(resourceName); + } + lastRequest = asks.get(resourceName); + + if (recoverPreemptedRequest && lastRequest != null) { + // Increment the number of containers to 1, as it is recovering a + // single container. + request.setNumContainers(lastRequest.getNumContainers() + 1); } asks.put(resourceName, request); @@ -245,14 +260,16 @@ public class AppSchedulingInfo { * @param container * the containers allocated. */ - synchronized public void allocate(NodeType type, SchedulerNode node, - Priority priority, ResourceRequest request, Container container) { + synchronized public List<ResourceRequest> allocate(NodeType type, + SchedulerNode node, Priority priority, ResourceRequest request, + Container container) { + List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container); + allocateNodeLocal(node, priority, request, container, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container); + allocateRackLocal(node, priority, request, container, resourceRequests); } else { - allocateOffSwitch(node, priority, request, container); + allocateOffSwitch(node, priority, request, container, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -270,6 +287,7 @@ public class AppSchedulingInfo { + " resource=" + request.getCapability()); } metrics.allocateResources(user, 1, request.getCapability(), true); + return resourceRequests; } /** @@ -279,9 +297,9 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal( - SchedulerNode node, Priority priority, - ResourceRequest nodeLocalRequest, Container container) { + synchronized private void allocateNodeLocal(SchedulerNode node, + Priority priority, ResourceRequest nodeLocalRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1); if (nodeLocalRequest.getNumContainers() == 0) { @@ -295,7 +313,14 @@ public class AppSchedulingInfo { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -305,16 +330,22 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal( - SchedulerNode node, Priority priority, - ResourceRequest rackLocalRequest, Container container) { + synchronized private void allocateRackLocal(SchedulerNode node, + Priority priority, ResourceRequest rackLocalRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1); if (rackLocalRequest.getNumContainers() == 0) { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -324,11 +355,13 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch( - SchedulerNode node, Priority priority, - ResourceRequest offSwitchRequest, Container container) { + synchronized private void allocateOffSwitch(SchedulerNode node, + Priority priority, ResourceRequest offSwitchRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); + // Update cloned OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } synchronized private void decrementOutstanding( @@ -377,6 +410,7 @@ public class AppSchedulingInfo { activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager.activateApplication(user, applicationId); this.queue = newQueue; + this.queueName = newQueue.getQueueName(); } synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { @@ -409,4 +443,29 @@ public class AppSchedulingInfo { // this.requests = appInfo.getRequests(); this.blacklist = appInfo.getBlackList(); } + + public synchronized void recoverContainer(RMContainer rmContainer) { + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any container to recover, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + + // Container is completed. Skip recovering resources. + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + + metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), + false); + } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newInstance( + request.getPriority(), request.getResourceName(), + request.getCapability(), 1, request.getRelaxLocality()); + return newRequest; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java Tue Aug 19 23:49:39 2014 @@ -26,6 +26,8 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @Evolving @LimitedPrivate("yarn") @@ -60,4 +62,13 @@ public interface Queue { boolean hasAccess(QueueACL acl, UserGroupInformation user); public ActiveUsersManager getActiveUsersManager(); + + /** + * Recover the state of the queue for a given container. + * @param clusterResource the resource of the cluster + * @param schedulerAttempt the application for which the container was allocated + * @param rmContainer the container that was recovered. + */ + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java Tue Aug 19 23:49:39 2014 @@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.res @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { + + /** + * Set RMContext for <code>ResourceScheduler</code>. + * This method should be called immediately after instantiating + * a scheduler once. + * @param rmContext created by ResourceManager + */ + void setRMContext(RMContext rmContext); + /** * Re-initialize the <code>ResourceScheduler</code>. * @param conf configuration Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java Tue Aug 19 23:49:39 2014 @@ -23,11 +23,11 @@ import org.apache.hadoop.yarn.server.res @Private @Unstable -public class SchedulerApplication { +public class SchedulerApplication<T extends SchedulerApplicationAttempt> { private Queue queue; private final String user; - private SchedulerApplicationAttempt currentAttempt; + private T currentAttempt; public SchedulerApplication(Queue queue, String user) { this.queue = queue; @@ -46,11 +46,11 @@ public class SchedulerApplication { return user; } - public SchedulerApplicationAttempt getCurrentAppAttempt() { + public T getCurrentAppAttempt() { return currentAttempt; } - public void setCurrentAppAttempt(SchedulerApplicationAttempt currentAttempt) { + public void setCurrentAppAttempt(T currentAttempt) { this.currentAttempt = currentAttempt; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Tue Aug 19 23:49:39 2014 @@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +34,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NMToken; @@ -46,9 +49,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -76,10 +81,20 @@ public class SchedulerApplicationAttempt protected final Resource currentReservation = Resource.newInstance(0, 0); private Resource resourceLimit = Resource.newInstance(0, 0); protected Resource currentConsumption = Resource.newInstance(0, 0); + private Resource amResource; + private boolean unmanagedAM = true; + private boolean amRunning = false; protected List<RMContainer> newlyAllocatedContainers = new ArrayList<RMContainer>(); + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered containers reported + // by NM should not be recovered. + private Set<ContainerId> pendingRelease = null; + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -101,11 +116,23 @@ public class SchedulerApplicationAttempt public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, RMContext rmContext) { + Preconditions.checkNotNull("RMContext should not be null", rmContext); this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager); + activeUsersManager, rmContext.getEpoch()); this.queue = queue; + this.pendingRelease = new HashSet<ContainerId>(); + if (rmContext.getRMApps() != null && + rmContext.getRMApps() + .containsKey(applicationAttemptId.getApplicationId())) { + ApplicationSubmissionContext appSubmissionContext = + rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) + .getApplicationSubmissionContext(); + if (appSubmissionContext != null) { + unmanagedAM = appSubmissionContext.getUnmanagedAM(); + } + } } /** @@ -144,6 +171,10 @@ public class SchedulerApplicationAttempt return appSchedulingInfo.getResourceRequests(priority); } + public Set<ContainerId> getPendingRelease() { + return this.pendingRelease; + } + public int getNewContainerId() { return appSchedulingInfo.getNewContainerId(); } @@ -168,6 +199,26 @@ public class SchedulerApplicationAttempt return appSchedulingInfo.getQueueName(); } + public Resource getAMResource() { + return amResource; + } + + public void setAMResource(Resource amResource) { + this.amResource = amResource; + } + + public boolean isAmRunning() { + return amRunning; + } + + public void setAmRunning(boolean bool) { + amRunning = bool; + } + + public boolean getUnmanagedAM() { + return unmanagedAM; + } + public synchronized RMContainer getRMContainer(ContainerId id) { return liveContainers.get(id); } @@ -202,7 +253,14 @@ public class SchedulerApplicationAttempt public synchronized void updateResourceRequests( List<ResourceRequest> requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(requests, false); + } + } + + public synchronized void recoverResourceRequests( + List<ResourceRequest> requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); } } @@ -378,7 +436,8 @@ public class SchedulerApplicationAttempt // create container token and NMToken altogether. container.setContainerToken(rmContext.getContainerTokenSecretManager() .createContainerToken(container.getId(), container.getNodeId(), - getUser(), container.getResource())); + getUser(), container.getResource(), container.getPriority(), + rmContainer.getCreationTime())); NMToken nmToken = rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(), getApplicationAttemptId(), container); @@ -499,5 +558,24 @@ public class SchedulerApplicationAttempt appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + // recover app scheduling info + appSchedulingInfo.recoverContainer(rmContainer); + + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + Resources.addTo(currentConsumption, rmContainer.getContainer() + .getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + // newlyAllocatedContainers.add(rmContainer); + // schedulingOpportunities + // lastScheduledContainer + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java Tue Aug 19 23:49:39 2014 @@ -18,11 +18,26 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.util.resource.Resources; + /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -31,59 +46,236 @@ import org.apache.hadoop.yarn.conf.YarnC @Unstable public abstract class SchedulerNode { + private static final Log LOG = LogFactory.getLog(SchedulerNode.class); + + private Resource availableResource = Resource.newInstance(0, 0); + private Resource usedResource = Resource.newInstance(0, 0); + private Resource totalResourceCapability; + private RMContainer reservedContainer; + private volatile int numContainers; + + + /* set of containers that are allocated containers */ + private final Map<ContainerId, RMContainer> launchedContainers = + new HashMap<ContainerId, RMContainer>(); + + private final RMNode rmNode; + private final String nodeName; + + public SchedulerNode(RMNode node, boolean usePortForNodeName) { + this.rmNode = node; + this.availableResource = Resources.clone(node.getTotalCapability()); + this.totalResourceCapability = Resources.clone(node.getTotalCapability()); + if (usePortForNodeName) { + nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); + } else { + nodeName = rmNode.getHostName(); + } + } + + public RMNode getRMNode() { + return this.rmNode; + } + + /** + * Get the ID of the node which contains both its hostname and port. + * + * @return the ID of the node + */ + public NodeId getNodeID() { + return this.rmNode.getNodeID(); + } + + public String getHttpAddress() { + return this.rmNode.getHttpAddress(); + } + /** * Get the name of the node for scheduling matching decisions. * <p/> - * Typically this is the 'hostname' reported by the node, but it could be - * configured to be 'hostname:port' reported by the node via the + * Typically this is the 'hostname' reported by the node, but it could be + * configured to be 'hostname:port' reported by the node via the * {@link YarnConfiguration#RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME} constant. * The main usecase of this is Yarn minicluster to be able to differentiate * node manager instances by their port number. * * @return name of the node for scheduling matching decisions. */ - public abstract String getNodeName(); - + public String getNodeName() { + return nodeName; + } + /** * Get rackname. + * * @return rackname */ - public abstract String getRackName(); - + public String getRackName() { + return this.rmNode.getRackName(); + } + /** - * Get used resources on the node. - * @return used resources on the node + * The Scheduler has allocated containers on this node to the given + * application. + * + * @param rmContainer + * allocated container */ - public abstract Resource getUsedResource(); + public synchronized void allocateContainer(RMContainer rmContainer) { + Container container = rmContainer.getContainer(); + deductAvailableResource(container.getResource()); + ++numContainers; + + launchedContainers.put(container.getId(), rmContainer); + + LOG.info("Assigned container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available after allocation"); + } /** * Get available resources on the node. + * * @return available resources on the node */ - public abstract Resource getAvailableResource(); + public synchronized Resource getAvailableResource() { + return this.availableResource; + } /** - * Get number of active containers on the node. - * @return number of active containers on the node - */ - public abstract int getNumContainers(); - - /** - * Apply delta resource on node's available resource. - * @param deltaResource the delta of resource need to apply to node + * Get used resources on the node. + * + * @return used resources on the node */ - public abstract void applyDeltaOnAvailableResource(Resource deltaResource); + public synchronized Resource getUsedResource() { + return this.usedResource; + } /** * Get total resources on the node. + * * @return total resources on the node. */ - public abstract Resource getTotalResource(); - + public Resource getTotalResource() { + return this.totalResourceCapability; + } + + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) { + return true; + } + return false; + } + + private synchronized void updateResource(Container container) { + addAvailableResource(container.getResource()); + --numContainers; + } + /** - * Get the ID of the node which contains both its hostname and port. - * @return the ID of the node + * Release an allocated container on this node. + * + * @param container + * container to be released + */ + public synchronized void releaseContainer(Container container) { + if (!isValidContainer(container.getId())) { + LOG.error("Invalid container released " + container); + return; + } + + /* remove the containers from the nodemanger */ + if (null != launchedContainers.remove(container.getId())) { + updateResource(container); + } + + LOG.info("Released container " + container.getId() + " of capacity " + + container.getResource() + " on host " + rmNode.getNodeAddress() + + ", which currently has " + numContainers + " containers, " + + getUsedResource() + " used and " + getAvailableResource() + + " available" + ", release resources=" + true); + } + + private synchronized void addAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid resource addition of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } + + private synchronized void deductAvailableResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.subtractFrom(availableResource, resource); + Resources.addTo(usedResource, resource); + } + + /** + * Reserve container for the attempt on this node. + */ + public abstract void reserveResource(SchedulerApplicationAttempt attempt, + Priority priority, RMContainer container); + + /** + * Unreserve resources on this node. */ - public abstract NodeId getNodeID(); + public abstract void unreserveResource(SchedulerApplicationAttempt attempt); + + @Override + public String toString() { + return "host: " + rmNode.getNodeAddress() + " #containers=" + + getNumContainers() + " available=" + + getAvailableResource().getMemory() + " used=" + + getUsedResource().getMemory(); + } + + /** + * Get number of active containers on the node. + * + * @return number of active containers on the node + */ + public int getNumContainers() { + return numContainers; + } + + public synchronized List<RMContainer> getRunningContainers() { + return new ArrayList<RMContainer>(launchedContainers.values()); + } + + public synchronized RMContainer getReservedContainer() { + return reservedContainer; + } + + protected synchronized void + setReservedContainer(RMContainer reservedContainer) { + this.reservedContainer = reservedContainer; + } + + /** + * Apply delta resource on node's available resource. + * + * @param deltaResource + * the delta of resource need to apply to node + */ + public synchronized void + applyDeltaOnAvailableResource(Resource deltaResource) { + // we can only adjust available resource if total resource is changed. + Resources.addTo(this.availableResource, deltaResource); + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(rmContainer); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Tue Aug 19 23:49:39 2014 @@ -153,14 +153,17 @@ public class SchedulerUtils { * @param rmNode RMNode with new resource view * @param clusterResource the cluster's resource that need to update * @param log Scheduler's log for resource change + * @return true if the resources have changed */ - public static void updateResourceIfChanged(SchedulerNode node, + public static boolean updateResourceIfChanged(SchedulerNode node, RMNode rmNode, Resource clusterResource, Log log) { + boolean result = false; Resource oldAvailableResource = node.getAvailableResource(); Resource newAvailableResource = Resources.subtract( rmNode.getTotalCapability(), node.getUsedResource()); if (!newAvailableResource.equals(oldAvailableResource)) { + result = true; Resource deltaResource = Resources.subtract(newAvailableResource, oldAvailableResource); // Reflect resource change to scheduler node. @@ -176,6 +179,8 @@ public class SchedulerUtils { + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " + deltaResource.getMemory() +"MB"); } + + return result; } /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Tue Aug 19 23:49:39 2014 @@ -69,7 +69,15 @@ public interface YarnScheduler extends E @Public @Stable public List<QueueUserACLInfo> getQueueUserAclInfo(); - + + /** + * Get the whole resource capacity of the cluster. + * @return the whole resource capacity of the cluster. + */ + @LimitedPrivate("yarn") + @Unstable + public Resource getClusterResource(); + /** * Get minimum allocatable {@link Resource}. * @return minimum allocatable resource @@ -182,7 +190,7 @@ public interface YarnScheduler extends E @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); - + /** * Moves the given application to the given queue * @param appId @@ -194,4 +202,22 @@ public interface YarnScheduler extends E @Evolving public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; + + /** + * Completely drain sourceQueue of applications, by moving all of them to + * destQueue. + * + * @param sourceQueue + * @param destQueue + * @throws YarnException + */ + void moveAllApps(String sourceQueue, String destQueue) throws YarnException; + + /** + * Terminate all applications in the specified queue. + * + * @param queueName the name of queue to be drained + * @throws YarnException + */ + void killAllAppsInQueue(String queueName) throws YarnException; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Tue Aug 19 23:49:39 2014 @@ -28,7 +28,6 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -235,17 +234,26 @@ extends org.apache.hadoop.yarn.server.re public ActiveUsersManager getActiveUsersManager(); /** - * Recover the state of the queue - * @param clusterResource the resource of the cluster - * @param application the application for which the container was allocated - * @param container the container that was recovered. - */ - public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, - Container container); - - /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to */ public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps); + + /** + * Detach a container from this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to detach + */ + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); + + /** + * Attach a container to this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to attach + */ + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou class CSQueueUtils { + private static final Log LOG = LogFactory.getLog(CSQueueUtils.class); + final static float EPSILON = 0.0001f; public static void checkMaxCapacity(String queueName, @@ -113,4 +118,52 @@ class CSQueueUtils { ) ); } + + public static float getAbsoluteMaxAvailCapacity( + ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { + CSQueue parent = queue.getParent(); + if (parent == null) { + return queue.getAbsoluteMaximumCapacity(); + } + + //Get my parent's max avail, needed to determine my own + float parentMaxAvail = getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, parent); + //...and as a resource + Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); + + //check for no resources parent before dividing, if so, max avail is none + if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { + return 0.0f; + } + //sibling used is parent used - my used... + float siblingUsedCapacity = Resources.ratio( + resourceCalculator, + Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), + parentResource); + //my max avail is the lesser of my max capacity and what is unused from my parent + //by my siblings (if they are beyond their base capacity) + float maxAvail = Math.min( + queue.getMaximumCapacity(), + 1.0f - siblingUsedCapacity); + //and, mutiply by parent to get absolute (cluster relative) value + float absoluteMaxAvail = maxAvail * parentMaxAvail; + + if (LOG.isDebugEnabled()) { + LOG.debug("qpath " + queue.getQueuePath()); + LOG.debug("parentMaxAvail " + parentMaxAvail); + LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); + LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); + LOG.debug("maxAvail " + maxAvail); + LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); + } + + if (absoluteMaxAvail < 0.0f) { + absoluteMaxAvail = 0.0f; + } else if (absoluteMaxAvail > 1.0f) { + absoluteMaxAvail = 1.0f; + } + + return absoluteMaxAvail; + } }
