Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Fri Jul 25 20:33:09 2014 @@ -20,9 +20,8 @@ package org.apache.hadoop.yarn.server.re import java.util.ArrayList; import java.util.EnumSet; -import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -105,8 +104,8 @@ public class RMNodeImpl implements RMNod private String nodeManagerVersion; /* set of containers that have just launched */ - private final Map<ContainerId, ContainerStatus> justLaunchedContainers = - new HashMap<ContainerId, ContainerStatus>(); + private final Set<ContainerId> launchedContainers = + new HashSet<ContainerId>(); /* set of containers that need to be cleaned */ private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( @@ -476,6 +475,13 @@ public class RMNodeImpl implements RMNod // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); containers = startEvent.getNMContainerStatuses(); + if (containers != null && !containers.isEmpty()) { + for (NMContainerStatus container : containers) { + if (container.getContainerState() == ContainerState.RUNNING) { + rmNode.launchedContainers.add(container.getContainerId()); + } + } + } } if (null != startEvent.getRunningApplications()) { @@ -664,14 +670,14 @@ public class RMNodeImpl implements RMNod // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.justLaunchedContainers.containsKey(containerId)) { + if (!rmNode.launchedContainers.contains(containerId)) { // Just launched container. RM knows about it the first time. - rmNode.justLaunchedContainers.put(containerId, remoteContainer); + rmNode.launchedContainers.add(containerId); newlyLaunchedContainers.add(remoteContainer); } } else { // A finished container - rmNode.justLaunchedContainers.remove(containerId); + rmNode.launchedContainers.remove(containerId); completedContainers.add(remoteContainer); } } @@ -748,4 +754,10 @@ public class RMNodeImpl implements RMNod public int getQueueSize() { return nodeUpdateQueue.size(); } + + // For test only. + @VisibleForTesting + public Set<ContainerId> getLaunchedContainers() { + return this.launchedContainers; + } }
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Fri Jul 25 20:33:09 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -122,6 +123,23 @@ public abstract class AbstractYarnSchedu return maximumAllocation; } + protected void containerLaunchedOnNode(ContainerId containerId, + SchedulerNode node) { + // Get the application for the finished container + SchedulerApplicationAttempt application = getCurrentAttemptForContainer + (containerId); + if (application == null) { + LOG.info("Unknown application " + + containerId.getApplicationAttemptId().getApplicationId() + + " launched container " + containerId + " on node: " + node); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); + return; + } + + application.containerLaunchedOnNode(containerId, node.getNodeID()); + } + public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { SchedulerApplication<T> app = applications.get(applicationAttemptId.getApplicationId()); @@ -275,6 +293,27 @@ public abstract class AbstractYarnSchedu return rmContainer; } + /** + * Recover resource request back from RMContainer when a container is + * preempted before AM pulled the same. If container is pulled by + * AM, then RMContainer will not have resource request to recover. + * @param rmContainer + */ + protected void recoverResourceRequestForContainer(RMContainer rmContainer) { + List<ResourceRequest> requests = rmContainer.getResourceRequests(); + + // If container state is moved to ACQUIRED, request will be empty. + if (requests == null) { + return; + } + // Add resource request back to Scheduler. + SchedulerApplicationAttempt schedulerAttempt + = getCurrentAttemptForContainer(rmContainer.getContainerId()); + if (schedulerAttempt != null) { + schedulerAttempt.recoverResourceRequests(requests); + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Fri Jul 25 20:33:09 2014 @@ -127,9 +127,10 @@ public class AppSchedulingInfo { * by the application. * * @param requests resources to be acquired + * @param recoverPreemptedRequest recover Resource Request on preemption */ synchronized public void updateResourceRequests( - List<ResourceRequest> requests) { + List<ResourceRequest> requests, boolean recoverPreemptedRequest) { QueueMetrics metrics = queue.getMetrics(); // Update resource requests @@ -163,8 +164,13 @@ public class AppSchedulingInfo { asks = new HashMap<String, ResourceRequest>(); this.requests.put(priority, asks); this.priorities.add(priority); - } else if (updatePendingResources) { - lastRequest = asks.get(resourceName); + } + lastRequest = asks.get(resourceName); + + if (recoverPreemptedRequest && lastRequest != null) { + // Increment the number of containers to 1, as it is recovering a + // single container. + request.setNumContainers(lastRequest.getNumContainers() + 1); } asks.put(resourceName, request); @@ -254,14 +260,16 @@ public class AppSchedulingInfo { * @param container * the containers allocated. */ - synchronized public void allocate(NodeType type, SchedulerNode node, - Priority priority, ResourceRequest request, Container container) { + synchronized public List<ResourceRequest> allocate(NodeType type, + SchedulerNode node, Priority priority, ResourceRequest request, + Container container) { + List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>(); if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container); + allocateNodeLocal(node, priority, request, container, resourceRequests); } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container); + allocateRackLocal(node, priority, request, container, resourceRequests); } else { - allocateOffSwitch(node, priority, request, container); + allocateOffSwitch(node, priority, request, container, resourceRequests); } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -279,6 +287,7 @@ public class AppSchedulingInfo { + " resource=" + request.getCapability()); } metrics.allocateResources(user, 1, request.getCapability(), true); + return resourceRequests; } /** @@ -288,9 +297,9 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateNodeLocal( - SchedulerNode node, Priority priority, - ResourceRequest nodeLocalRequest, Container container) { + synchronized private void allocateNodeLocal(SchedulerNode node, + Priority priority, ResourceRequest nodeLocalRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1); if (nodeLocalRequest.getNumContainers() == 0) { @@ -304,7 +313,14 @@ public class AppSchedulingInfo { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned NodeLocal, RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(nodeLocalRequest)); + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -314,16 +330,22 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateRackLocal( - SchedulerNode node, Priority priority, - ResourceRequest rackLocalRequest, Container container) { + synchronized private void allocateRackLocal(SchedulerNode node, + Priority priority, ResourceRequest rackLocalRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1); if (rackLocalRequest.getNumContainers() == 0) { this.requests.get(priority).remove(node.getRackName()); } - decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY)); + ResourceRequest offRackRequest = requests.get(priority).get( + ResourceRequest.ANY); + decrementOutstanding(offRackRequest); + + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(rackLocalRequest)); + resourceRequests.add(cloneResourceRequest(offRackRequest)); } /** @@ -333,11 +355,13 @@ public class AppSchedulingInfo { * @param allocatedContainers * resources allocated to the application */ - synchronized private void allocateOffSwitch( - SchedulerNode node, Priority priority, - ResourceRequest offSwitchRequest, Container container) { + synchronized private void allocateOffSwitch(SchedulerNode node, + Priority priority, ResourceRequest offSwitchRequest, Container container, + List<ResourceRequest> resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); + // Update cloned RackLocal and OffRack requests for recovery + resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } synchronized private void decrementOutstanding( @@ -436,4 +460,11 @@ public class AppSchedulingInfo { metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), false); } + + public ResourceRequest cloneResourceRequest(ResourceRequest request) { + ResourceRequest newRequest = ResourceRequest.newInstance( + request.getPriority(), request.getResourceName(), + request.getCapability(), 1, request.getRelaxLocality()); + return newRequest; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Fri Jul 25 20:33:09 2014 @@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt public synchronized void updateResourceRequests( List<ResourceRequest> requests) { if (!isStopped) { - appSchedulingInfo.updateResourceRequests(requests); + appSchedulingInfo.updateResourceRequests(requests, false); + } + } + + public synchronized void recoverResourceRequests( + List<ResourceRequest> requests) { + if (!isStopped) { + appSchedulingInfo.updateResourceRequests(requests, true); } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Fri Jul 25 20:33:09 2014 @@ -69,7 +69,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; @@ -521,7 +520,7 @@ public class CapacityScheduler extends } private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { // santiy checks. CSQueue queue = getQueue(queueName); if (queue == null) { @@ -553,14 +552,20 @@ public class CapacityScheduler extends applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } private synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication<FiCaSchedulerApp> application = applications.get(applicationAttemptId.getApplicationId()); CSQueue queue = (CSQueue) application.getQueue(); @@ -578,14 +583,15 @@ public class CapacityScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -859,21 +865,6 @@ public class CapacityScheduler extends } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Override public void handle(SchedulerEvent event) { switch(event.getType()) { @@ -905,7 +896,8 @@ public class CapacityScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -921,7 +913,7 @@ public class CapacityScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: @@ -1089,6 +1081,7 @@ public class CapacityScheduler extends if (LOG.isDebugEnabled()) { LOG.debug("KILL_CONTAINER: container" + cont.toString()); } + recoverResourceRequestForContainer(cont); completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Fri Jul 25 20:33:09 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc if (null == liveContainers.remove(rmContainer.getContainerId())) { return false; } + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); @@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList); // Inform the container rmContainer.handle( Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java Fri Jul 25 20:33:09 2014 @@ -25,13 +25,20 @@ public class AppAddedSchedulerEvent exte private final ApplicationId applicationId; private final String queue; private final String user; + private final boolean isAppRecovering; public AppAddedSchedulerEvent( ApplicationId applicationId, String queue, String user) { + this(applicationId, queue, user, false); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, boolean isAppRecovering) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; this.user = user; + this.isAppRecovering = isAppRecovering; } public ApplicationId getApplicationId() { @@ -46,4 +53,7 @@ public class AppAddedSchedulerEvent exte return user; } + public boolean getIsAppRecovering() { + return isAppRecovering; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAttemptAddedSchedulerEvent.java Fri Jul 25 20:33:09 2014 @@ -24,22 +24,22 @@ public class AppAttemptAddedSchedulerEve private final ApplicationAttemptId applicationAttemptId; private final boolean transferStateFromPreviousAttempt; - private final boolean shouldNotifyAttemptAdded; + private final boolean isAttemptRecovering; public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt) { - this(applicationAttemptId, transferStateFromPreviousAttempt, true); + this(applicationAttemptId, transferStateFromPreviousAttempt, false); } public AppAttemptAddedSchedulerEvent( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { super(SchedulerEventType.APP_ATTEMPT_ADDED); this.applicationAttemptId = applicationAttemptId; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; - this.shouldNotifyAttemptAdded = shouldNotifyAttemptAdded; + this.isAttemptRecovering = isAttemptRecovering; } public ApplicationAttemptId getApplicationAttemptId() { @@ -50,7 +50,7 @@ public class AppAttemptAddedSchedulerEve return transferStateFromPreviousAttempt; } - public boolean getShouldNotifyAttemptAdded() { - return shouldNotifyAttemptAdded; + public boolean getIsAttemptRecovering() { + return isAttemptRecovering; } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Jul 25 20:33:09 2014 @@ -224,16 +224,17 @@ public class FSLeafQueue extends FSQueue @Override public RMContainer preemptContainer() { RMContainer toBePreempted = null; - if (LOG.isDebugEnabled()) { - LOG.debug("Queue " + getName() + " is going to preempt a container " + - "from its applications."); - } // If this queue is not over its fair share, reject if (!preemptContainerPreCheck()) { return toBePreempted; } + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getName() + " is going to preempt a container " + + "from its applications."); + } + // Choose the app that is most over fair share Comparator<Schedulable> comparator = policy.getComparator(); AppSchedulable candidateSched = null; @@ -328,4 +329,14 @@ public class FSLeafQueue extends FSQueue SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { // TODO Auto-generated method stub } + + /** + * Helper method to check if the queue should preempt containers + * + * @return true if check passes (can preempt) or false otherwise + */ + private boolean preemptContainerPreCheck() { + return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), + getFairShare()); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Jul 25 20:33:09 2014 @@ -164,11 +164,6 @@ public class FSParentQueue extends FSQue public RMContainer preemptContainer() { RMContainer toBePreempted = null; - // If this queue is not over its fair share, reject - if (!preemptContainerPreCheck()) { - return toBePreempted; - } - // Find the childQueue which is most over fair share FSQueue candidateQueue = null; Comparator<Schedulable> comparator = policy.getComparator(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Jul 25 20:33:09 2014 @@ -187,17 +187,4 @@ public abstract class FSQueue extends Sc } return true; } - - /** - * Helper method to check if the queue should preempt containers - * - * @return true if check passes (can preempt) or false otherwise - */ - protected boolean preemptContainerPreCheck() { - if (this == scheduler.getQueueManager().getRootQueue()) { - return true; - } - return parent.getPolicy() - .checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); - } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Jul 25 20:33:09 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + // Inform the container rmContainer.handle( new RMContainerFinishedEvent( @@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( + type, node, priority, request, container); Resources.addTo(currentConsumption, container.getResource()); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + // Inform the container rmContainer.handle( new RMContainerEvent(container.getId(), RMContainerEventType.START)); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Jul 25 20:33:09 2014 @@ -135,7 +135,9 @@ public class FairScheduler extends public static final Resource CONTAINER_RESERVED = Resources.createResource(-1); // How often fair shares are re-calculated (ms) - protected long UPDATE_INTERVAL = 500; + protected long updateInterval; + private final int UPDATE_DEBUG_FREQUENCY = 5; + private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; private Thread updateThread; private Thread schedulingThread; @@ -242,13 +244,13 @@ public class FairScheduler extends /** * A runnable which calls {@link FairScheduler#update()} every - * <code>UPDATE_INTERVAL</code> milliseconds. + * <code>updateInterval</code> milliseconds. */ private class UpdateThread implements Runnable { public void run() { while (true) { try { - Thread.sleep(UPDATE_INTERVAL); + Thread.sleep(updateInterval); update(); preemptTasksIfNecessary(); } catch (Exception e) { @@ -275,6 +277,18 @@ public class FairScheduler extends // Recursively compute fair shares for all queues // and update metrics rootQueue.recomputeShares(); + + if (LOG.isDebugEnabled()) { + if (--updatesToSkipForDebug < 0) { + updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY; + LOG.debug("Cluster Capacity: " + clusterResource + + " Allocations: " + rootMetrics.getAllocatedResources() + + " Availability: " + Resource.newInstance( + rootMetrics.getAvailableMB(), + rootMetrics.getAvailableVirtualCores()) + + " Demand: " + rootQueue.getDemand()); + } + } } /** @@ -408,7 +422,7 @@ public class FairScheduler extends } } - private void warnOrKillContainer(RMContainer container) { + protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSSchedulerApp app = getSchedulerApp(appAttemptId); FSLeafQueue queue = app.getQueue(); @@ -426,6 +440,7 @@ public class FairScheduler extends SchedulerUtils.createPreemptedContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); + recoverResourceRequestForContainer(container); // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). completedContainer(container, status, RMContainerEventType.KILL); @@ -551,7 +566,7 @@ public class FairScheduler extends * configured limits, but the app will not be marked as runnable. */ protected synchronized void addApplication(ApplicationId applicationId, - String queueName, String user) { + String queueName, String user, boolean isAppRecovering) { if (queueName == null || queueName.isEmpty()) { String message = "Reject application " + applicationId + " submitted by user " + user + " with an empty queue name."; @@ -588,8 +603,14 @@ public class FairScheduler extends LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } /** @@ -598,7 +619,7 @@ public class FairScheduler extends protected synchronized void addApplicationAttempt( ApplicationAttemptId applicationAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication<FSSchedulerApp> application = applications.get(applicationAttemptId.getApplicationId()); String user = application.getUser(); @@ -627,14 +648,15 @@ public class FairScheduler extends LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(applicationAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(applicationAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -879,14 +901,14 @@ public class FairScheduler extends // Update application requests application.updateResourceRequests(ask); - LOG.debug("allocate: post-update"); application.showRequests(); } if (LOG.isDebugEnabled()) { - LOG.debug("allocate:" + + LOG.debug("allocate: post-update" + " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size()); + " #ask=" + ask.size() + + " reservation= " + application.getCurrentReservation()); LOG.debug("Preempting " + application.getPreemptionContainers().size() + " container(s)"); @@ -907,22 +929,6 @@ public class FairScheduler extends } /** - * Process a container which has launched on a node, as reported by the node. - */ - private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { - // Get the application for the finished container - FSSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - - /** * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { @@ -964,37 +970,27 @@ public class FairScheduler extends } } - private void continuousScheduling() { - while (true) { - List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); - // Sort the nodes by space available on them, so that we offer - // containers on emptier nodes first, facilitating an even spread. This - // requires holding the scheduler lock, so that the space available on a - // node doesn't change during the sort. - synchronized (this) { - Collections.sort(nodeIdList, nodeAvailableResourceComparator); - } - - // iterate all nodes - for (NodeId nodeId : nodeIdList) { - if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = getFSSchedulerNode(nodeId); - try { - if (Resources.fitsIn(minimumAllocation, - node.getAvailableResource())) { - attemptScheduling(node); - } - } catch (Throwable ex) { - LOG.warn("Error while attempting scheduling for node " + node + - ": " + ex.toString(), ex); - } - } - } + void continuousSchedulingAttempt() { + List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet()); + // Sort the nodes by space available on them, so that we offer + // containers on emptier nodes first, facilitating an even spread. This + // requires holding the scheduler lock, so that the space available on a + // node doesn't change during the sort. + synchronized (this) { + Collections.sort(nodeIdList, nodeAvailableResourceComparator); + } + + // iterate all nodes + for (NodeId nodeId : nodeIdList) { + FSSchedulerNode node = getFSSchedulerNode(nodeId); try { - Thread.sleep(getContinuousSchedulingSleepMs()); - } catch (InterruptedException e) { - LOG.warn("Error while doing sleep in continuous scheduling: " + - e.toString(), e); + if (node != null && Resources.fitsIn(minimumAllocation, + node.getAvailableResource())) { + attemptScheduling(node); + } + } catch (Throwable ex) { + LOG.error("Error while attempting scheduling for node " + node + + ": " + ex.toString(), ex); } } } @@ -1004,6 +1000,12 @@ public class FairScheduler extends @Override public int compare(NodeId n1, NodeId n2) { + if (!nodes.containsKey(n1)) { + return 1; + } + if (!nodes.containsKey(n2)) { + return -1; + } return RESOURCE_CALCULATOR.compare(clusterResource, nodes.get(n2).getAvailableResource(), nodes.get(n1).getAvailableResource()); @@ -1121,7 +1123,8 @@ public class FairScheduler extends } AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); break; case APP_REMOVED: if (!(event instanceof AppRemovedSchedulerEvent)) { @@ -1139,7 +1142,7 @@ public class FairScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); break; case APP_ATTEMPT_REMOVED: if (!(event instanceof AppAttemptRemovedSchedulerEvent)) { @@ -1203,6 +1206,15 @@ public class FairScheduler extends waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); usePortForNodeName = this.conf.getUsePortForNodeName(); + updateInterval = this.conf.getUpdateInterval(); + if (updateInterval < 0) { + updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; + LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS + + " is invalid, so using default value " + + + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + + " ms instead"); + } + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); // This stores per-application scheduling information this.applications = @@ -1227,7 +1239,16 @@ public class FairScheduler extends new Runnable() { @Override public void run() { - continuousScheduling(); + while (!Thread.currentThread().isInterrupted()) { + try { + continuousSchedulingAttempt(); + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException e) { + LOG.error("Continuous scheduling thread interrupted. Exiting. ", + e); + return; + } + } } } ); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Jul 25 20:33:09 2014 @@ -123,6 +123,11 @@ public class FairSchedulerConfiguration protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign"; protected static final int DEFAULT_MAX_ASSIGN = -1; + /** The update interval for calculating resources in FairScheduler .*/ + public static final String UPDATE_INTERVAL_MS = + CONF_PREFIX + "update-interval-ms"; + public static final int DEFAULT_UPDATE_INTERVAL_MS = 500; + public FairSchedulerConfiguration() { super(); } @@ -246,6 +251,10 @@ public class FairSchedulerConfiguration "Error reading resource config", ex); } } + + public long getUpdateInterval() { + return getLong(UPDATE_INTERVAL_MS, DEFAULT_UPDATE_INTERVAL_MS); + } private static int findResource(String val, String units) throws AllocationConfigurationException { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Fri Jul 25 20:33:09 2014 @@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch private static class FairShareComparator implements Comparator<Schedulable>, Serializable { private static final long serialVersionUID = 5564969375856699313L; + private static final Resource ONE = Resources.createResource(1); @Override public int compare(Schedulable s1, Schedulable s2) { @@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch s1.getResourceUsage(), minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, s2.getResourceUsage(), minShare2); - Resource one = Resources.createResource(1); minShareRatio1 = (double) s1.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory(); minShareRatio2 = (double) s2.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeights().getWeight(ResourceType.MEMORY); useToWeightRatio2 = s2.getResourceUsage().getMemory() / Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Jul 25 20:33:09 2014 @@ -66,7 +66,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -356,22 +355,28 @@ public class FifoScheduler extends @VisibleForTesting public synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { + String queue, String user, boolean isAppRecovering) { SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } @VisibleForTesting public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, - boolean shouldNotifyAttemptAdded) { + boolean isAttemptRecovering) { SchedulerApplication<FiCaSchedulerApp> application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); @@ -389,14 +394,15 @@ public class FifoScheduler extends metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - if (shouldNotifyAttemptAdded) { - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); - } else { + if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping notifying ATTEMPT_ADDED"); + LOG.debug(appAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); } + } else { + rmContext.getDispatcher().getEventHandler().handle( + new RMAppAttemptEvent(appAttemptId, + RMAppAttemptEventType.ATTEMPT_ADDED)); } } @@ -772,7 +778,8 @@ public class FifoScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -788,7 +795,7 @@ public class FifoScheduler extends (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), - appAttemptAddedEvent.getShouldNotifyAttemptAdded()); + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: @@ -823,23 +830,6 @@ public class FifoScheduler extends } } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Lock(FifoScheduler.class) private synchronized void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Fri Jul 25 20:33:09 2014 @@ -19,22 +19,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; - -import javax.crypto.SecretKey; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.security.MasterKeyData; + +import com.google.common.annotations.VisibleForTesting; /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their @@ -49,40 +55,66 @@ public class AMRMTokenSecretManager exte private static final Log LOG = LogFactory .getLog(AMRMTokenSecretManager.class); - private SecretKey masterKey; + private int serialNo = new SecureRandom().nextInt(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final Timer timer; private final long rollingInterval; + private final long activationDelay; - private final Map<ApplicationAttemptId, byte[]> passwords = - new HashMap<ApplicationAttemptId, byte[]>(); + private final Set<ApplicationAttemptId> appAttemptSet = + new HashSet<ApplicationAttemptId>(); /** * Create an {@link AMRMTokenSecretManager} */ public AMRMTokenSecretManager(Configuration conf) { - rollMasterKey(); this.timer = new Timer(); this.rollingInterval = conf .getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 2 X " + + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS); + } } public void start() { - this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval); + if (this.currentMasterKey == null) { + this.currentMasterKey = createNewMasterKey(); + } + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, + rollingInterval); } public void stop() { this.timer.cancel(); } - public synchronized void applicationMasterFinished( - ApplicationAttemptId appAttemptId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Application finished, removing password for " + appAttemptId); + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Application finished, removing password for " + appAttemptId); + this.appAttemptSet.remove(appAttemptId); + } finally { + this.writeLock.unlock(); } - this.passwords.remove(appAttemptId); } private class MasterKeyRoller extends TimerTask { @@ -93,49 +125,89 @@ public class AMRMTokenSecretManager exte } @Private - public synchronized void setMasterKey(SecretKey masterKey) { - this.masterKey = masterKey; + void rollMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } finally { + this.writeLock.unlock(); + } } - @Private - public synchronized SecretKey getMasterKey() { - return this.masterKey; + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + public void activateNextMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + } finally { + this.writeLock.unlock(); + } } @Private - synchronized void rollMasterKey() { - LOG.info("Rolling master-key for amrm-tokens"); - this.masterKey = generateSecret(); + @VisibleForTesting + public MasterKeyData createNewMasterKey() { + this.writeLock.lock(); + try { + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } } - /** - * Create a password for a given {@link AMRMTokenIdentifier}. Used to - * send to the AppicationAttempt which can give it back during authentication. - */ - @Override - public synchronized byte[] createPassword( - AMRMTokenIdentifier identifier) { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating password for " + applicationAttemptId); - } - byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); - return password; + public Token<AMRMTokenIdentifier> createAndGetAMRMToken( + ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey() + .getKeyId()); + byte[] password = this.createPassword(identifier); + appAttemptSet.add(appAttemptId); + return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password, + identifier.getKind(), new Text()); + } finally { + this.writeLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey + @VisibleForTesting + public MasterKeyData getMasterKey() { + this.readLock.lock(); + try { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); + } } /** * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. */ - public synchronized void - addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException { - AMRMTokenIdentifier identifier = token.decodeIdentifier(); - if (LOG.isDebugEnabled()) { + public void addPersistedPassword(Token<AMRMTokenIdentifier> token) + throws IOException { + this.writeLock.lock(); + try { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + appAttemptSet.add(identifier.getApplicationAttemptId()); + } finally { + this.writeLock.unlock(); } - this.passwords.put(identifier.getApplicationAttemptId(), - token.getPassword()); } /** @@ -143,19 +215,35 @@ public class AMRMTokenSecretManager exte * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. */ @Override - public synchronized byte[] retrievePassword( - AMRMTokenIdentifier identifier) throws InvalidToken { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to retrieve password for " + applicationAttemptId); - } - byte[] password = this.passwords.get(applicationAttemptId); - if (password == null) { - throw new InvalidToken("Password not found for ApplicationAttempt " - + applicationAttemptId); + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + applicationAttemptId); + } + if (!appAttemptSet.contains(applicationAttemptId)) { + throw new InvalidToken("Password not found for ApplicationAttempt " + + applicationAttemptId); + } + if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + } else if (nextMasterKey != null + && identifier.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + } + throw new InvalidToken("Given AMRMToken for application : " + + applicationAttemptId.toString() + + " seems to have been generated illegally."); + } finally { + this.readLock.unlock(); } - return password; } /** @@ -167,4 +255,40 @@ public class AMRMTokenSecretManager exte return new AMRMTokenIdentifier(); } + @Private + @VisibleForTesting + public MasterKeyData getCurrnetMasterKeyData() { + this.readLock.lock(); + try { + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData getNextMasterKeyData() { + this.readLock.lock(); + try { + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Override + @Private + protected byte[] createPassword(AMRMTokenIdentifier identifier) { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + LOG.info("Creating password for " + applicationAttemptId); + return createPassword(identifier.getBytes(), getMasterKey() + .getSecretKey()); + } finally { + this.readLock.unlock(); + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Fri Jul 25 20:33:09 2014 @@ -388,7 +388,11 @@ public class DelegationTokenRenewer exte // If user provides incorrect token then it should not be added for // renewal. for (DelegationTokenToRenew dtr : tokenList) { - renewToken(dtr); + try { + renewToken(dtr); + } catch (IOException ioe) { + throw new IOException("Failed to renew token: " + dtr.token, ioe); + } } for (DelegationTokenToRenew dtr : tokenList) { addTokenToList(dtr); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Fri Jul 25 20:33:09 2014 @@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana addPersistedDelegationToken(entry.getKey(), entry.getValue()); } } + + public long getRenewDate(RMDelegationTokenIdentifier ident) + throws InvalidToken { + DelegationTokenInformation info = currentTokens.get(ident); + if (info == null) { + throw new InvalidToken("token (" + ident.toString() + + ") can't be found in cache"); + } + return info.getRenewDate(); + } }
