http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a30f2f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 9e5a807..3555faa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -123,65 +123,72 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return queue.getMetrics(); } - synchronized public void containerCompleted(RMContainer rmContainer, + public void containerCompleted(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { - - Container container = rmContainer.getContainer(); - ContainerId containerId = container.getId(); - - // Remove from the list of newly allocated containers if found - newlyAllocatedContainers.remove(rmContainer); - - // Inform the container - rmContainer.handle( - new RMContainerFinishedEvent( - containerId, - containerStatus, - event) - ); - if (LOG.isDebugEnabled()) { - LOG.debug("Completed container: " + rmContainer.getContainerId() + - " in state: " + rmContainer.getState() + " event:" + event); - } + try { + writeLock.lock(); + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Remove from the list of newly allocated containers if found + newlyAllocatedContainers.remove(rmContainer); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent(containerId, containerStatus, event)); + if (LOG.isDebugEnabled()) { + LOG.debug("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + } + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); - // Remove from the list of containers - liveContainers.remove(rmContainer.getContainerId()); + Resource containerResource = rmContainer.getContainer().getResource(); + RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER, + "SchedulerApp", getApplicationId(), containerId, containerResource); - Resource containerResource = rmContainer.getContainer().getResource(); - RMAuditLogger.logSuccess(getUser(), - AuditConstants.RELEASE_CONTAINER, "SchedulerApp", - getApplicationId(), containerId, containerResource); - - // Update usage metrics - queue.getMetrics().releaseResources(getUser(), 1, containerResource); - this.attemptResourceUsage.decUsed(containerResource); + // Update usage metrics + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + this.attemptResourceUsage.decUsed(containerResource); - // remove from preemption map if it is completed - preemptionMap.remove(rmContainer); + // remove from preemption map if it is completed + preemptionMap.remove(rmContainer); - // Clear resource utilization metrics cache. - lastMemoryAggregateAllocationUpdateTime = -1; + // Clear resource utilization metrics cache. + lastMemoryAggregateAllocationUpdateTime = -1; + } finally { + writeLock.unlock(); + } } - private synchronized void unreserveInternal( + private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { - Map<NodeId, RMContainer> reservedContainers = - this.reservedContainers.get(schedulerKey); - RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); - if (reservedContainers.isEmpty()) { - this.reservedContainers.remove(schedulerKey); - } - - // Reset the re-reservation count - resetReReservations(schedulerKey); + try { + writeLock.lock(); + Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get( + schedulerKey); + RMContainer reservedContainer = reservedContainers.remove( + node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(schedulerKey); + } + + // Reset the re-reservation count + resetReReservations(schedulerKey); - Resource resource = reservedContainer.getContainer().getResource(); - this.attemptResourceUsage.decReserved(resource); + Resource resource = reservedContainer.getContainer().getResource(); + this.attemptResourceUsage.decReserved(resource); - LOG.info("Application " + getApplicationId() + " unreserved " + " on node " - + node + ", currently has " + reservedContainers.size() - + " at priority " + schedulerKey.getPriority() + "; currentReservation " - + this.attemptResourceUsage.getReserved()); + LOG.info( + "Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + + " at priority " + schedulerKey.getPriority() + + "; currentReservation " + this.attemptResourceUsage + .getReserved()); + } finally { + writeLock.unlock(); + } } private void subtractResourcesOnBlacklistedNodes( @@ -239,17 +246,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return headroom; } - public synchronized float getLocalityWaitFactor( - SchedulerRequestKey schedulerKey, int clusterNodes) { - // Estimate: Required unique resources (i.e. hosts + racks) - int requiredResources = - Math.max(this.getResourceRequests(schedulerKey).size() - 1, 0); - - // waitFactor can't be more than '1' - // i.e. no point skipping more than clustersize opportunities - return Math.min(((float)requiredResources / clusterNodes), 1.0f); - } - /** * Return the level at which we are allowed to schedule containers, given the * current size of the cluster and thresholds indicating how many nodes to @@ -261,44 +257,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @param rackLocalityThreshold rackLocalityThreshold * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevel( + NodeType getAllowedLocalityLevel( SchedulerRequestKey schedulerKey, int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) { // upper limit on threshold - if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; } - if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; } + if (nodeLocalityThreshold > 1.0) { + nodeLocalityThreshold = 1.0; + } + if (rackLocalityThreshold > 1.0) { + rackLocalityThreshold = 1.0; + } // If delay scheduling is not being used, can schedule anywhere if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) { return NodeType.OFF_SWITCH; } - // Default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(schedulerKey)) { - allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } - - NodeType allowed = allowedLocalityLevel.get(schedulerKey); + try { + writeLock.lock(); - // If level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH; + // Default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(schedulerKey)) { + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; + } - double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold : - rackLocalityThreshold; + NodeType allowed = allowedLocalityLevel.get(schedulerKey); - // Relax locality constraints once we've surpassed threshold. - if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(schedulerKey); + // If level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; } - else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(schedulerKey); + + double threshold = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityThreshold : + rackLocalityThreshold; + + // Relax locality constraints once we've surpassed threshold. + if (getSchedulingOpportunities(schedulerKey) > (numNodes * threshold)) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey); + } } + return allowedLocalityLevel.get(schedulerKey); + } finally { + writeLock.unlock(); } - return allowedLocalityLevel.get(schedulerKey); } /** @@ -311,119 +319,131 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @param currentTimeMs currentTimeMs * @return NodeType */ - public synchronized NodeType getAllowedLocalityLevelByTime( + NodeType getAllowedLocalityLevelByTime( SchedulerRequestKey schedulerKey, long nodeLocalityDelayMs, long rackLocalityDelayMs, long currentTimeMs) { - // if not being used, can schedule anywhere if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) { return NodeType.OFF_SWITCH; } - // default level is NODE_LOCAL - if (!allowedLocalityLevel.containsKey(schedulerKey)) { - // add the initial time of priority to prevent comparing with FsApp - // startTime and allowedLocalityLevel degrade - lastScheduledContainer.put(schedulerKey, currentTimeMs); - if (LOG.isDebugEnabled()) { - LOG.debug("Init the lastScheduledContainer time, priority: " - + schedulerKey.getPriority() + ", time: " + currentTimeMs); + try { + writeLock.lock(); + + // default level is NODE_LOCAL + if (!allowedLocalityLevel.containsKey(schedulerKey)) { + // add the initial time of priority to prevent comparing with FsApp + // startTime and allowedLocalityLevel degrade + lastScheduledContainer.put(schedulerKey, currentTimeMs); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Init the lastScheduledContainer time, priority: " + schedulerKey + .getPriority() + ", time: " + currentTimeMs); + } + allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); + return NodeType.NODE_LOCAL; } - allowedLocalityLevel.put(schedulerKey, NodeType.NODE_LOCAL); - return NodeType.NODE_LOCAL; - } - NodeType allowed = allowedLocalityLevel.get(schedulerKey); + NodeType allowed = allowedLocalityLevel.get(schedulerKey); - // if level is already most liberal, we're done - if (allowed.equals(NodeType.OFF_SWITCH)) { - return NodeType.OFF_SWITCH; - } - - // check waiting time - long waitTime = currentTimeMs; - if (lastScheduledContainer.containsKey(schedulerKey)) { - waitTime -= lastScheduledContainer.get(schedulerKey); - } else { - waitTime -= getStartTime(); - } + // if level is already most liberal, we're done + if (allowed.equals(NodeType.OFF_SWITCH)) { + return NodeType.OFF_SWITCH; + } - long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? - nodeLocalityDelayMs : rackLocalityDelayMs; + // check waiting time + long waitTime = currentTimeMs; + if (lastScheduledContainer.containsKey(schedulerKey)) { + waitTime -= lastScheduledContainer.get(schedulerKey); + } else{ + waitTime -= getStartTime(); + } - if (waitTime > thresholdTime) { - if (allowed.equals(NodeType.NODE_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); - resetSchedulingOpportunities(schedulerKey, currentTimeMs); - } else if (allowed.equals(NodeType.RACK_LOCAL)) { - allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); - resetSchedulingOpportunities(schedulerKey, currentTimeMs); + long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ? + nodeLocalityDelayMs : + rackLocalityDelayMs; + + if (waitTime > thresholdTime) { + if (allowed.equals(NodeType.NODE_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.RACK_LOCAL); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); + } else if (allowed.equals(NodeType.RACK_LOCAL)) { + allowedLocalityLevel.put(schedulerKey, NodeType.OFF_SWITCH); + resetSchedulingOpportunities(schedulerKey, currentTimeMs); + } } + return allowedLocalityLevel.get(schedulerKey); + } finally { + writeLock.unlock(); } - return allowedLocalityLevel.get(schedulerKey); } - synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, + public RMContainer allocate(NodeType type, FSSchedulerNode node, SchedulerRequestKey schedulerKey, ResourceRequest request, Container reservedContainer) { - // Update allowed locality level - NodeType allowed = allowedLocalityLevel.get(schedulerKey); - if (allowed != null) { - if (allowed.equals(NodeType.OFF_SWITCH) && - (type.equals(NodeType.NODE_LOCAL) || - type.equals(NodeType.RACK_LOCAL))) { - this.resetAllowedLocalityLevel(schedulerKey, type); + RMContainer rmContainer; + Container container; + + try { + writeLock.lock(); + // Update allowed locality level + NodeType allowed = allowedLocalityLevel.get(schedulerKey); + if (allowed != null) { + if (allowed.equals(NodeType.OFF_SWITCH) && (type.equals( + NodeType.NODE_LOCAL) || type.equals(NodeType.RACK_LOCAL))) { + this.resetAllowedLocalityLevel(schedulerKey, type); + } else if (allowed.equals(NodeType.RACK_LOCAL) && type.equals( + NodeType.NODE_LOCAL)) { + this.resetAllowedLocalityLevel(schedulerKey, type); + } } - else if (allowed.equals(NodeType.RACK_LOCAL) && - type.equals(NodeType.NODE_LOCAL)) { - this.resetAllowedLocalityLevel(schedulerKey, type); + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(schedulerKey) <= 0) { + return null; } - } - // Required sanity check - AM can call 'allocate' to update resource - // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(schedulerKey) <= 0) { - return null; - } + container = reservedContainer; + if (container == null) { + container = createContainer(node, request.getCapability(), + schedulerKey); + } - Container container = reservedContainer; - if (container == null) { - container = - createContainer(node, request.getCapability(), schedulerKey); - } - - // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, - getApplicationAttemptId(), node.getNodeID(), - appSchedulingInfo.getUser(), rmContext); - ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); + // Create RMContainer + rmContainer = new RMContainerImpl(container, + getApplicationAttemptId(), node.getNodeID(), + appSchedulingInfo.getUser(), rmContext); + ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName()); - // Add it to allContainers list. - newlyAllocatedContainers.add(rmContainer); - liveContainers.put(container.getId(), rmContainer); + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); - // Update consumption and track allocations - List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( - type, node, schedulerKey, request, container); - this.attemptResourceUsage.incUsed(container.getResource()); + // Update consumption and track allocations + List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate( + type, node, schedulerKey, request, container); + this.attemptResourceUsage.incUsed(container.getResource()); - // Update resource requests related to "request" and store in RMContainer - ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); + // Update resource requests related to "request" and store in RMContainer + ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList); - // Inform the container - rmContainer.handle( - new RMContainerEvent(container.getId(), RMContainerEventType.START)); + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + container.getId() + .getApplicationAttemptId() + " container=" + container.getId() + + " host=" + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), AuditConstants.ALLOC_CONTAINER, + "SchedulerApp", getApplicationId(), container.getId(), + container.getResource()); + } finally { + writeLock.unlock(); + } - if (LOG.isDebugEnabled()) { - LOG.debug("allocate: applicationAttemptId=" - + container.getId().getApplicationAttemptId() - + " container=" + container.getId() + " host=" - + container.getNodeId().getHost() + " type=" + type); - } - RMAuditLogger.logSuccess(getUser(), - AuditConstants.ALLOC_CONTAINER, "SchedulerApp", - getApplicationId(), container.getId(), container.getResource()); - return rmContainer; } @@ -434,19 +454,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * @param schedulerKey Scheduler Key * @param level NodeType */ - public synchronized void resetAllowedLocalityLevel( + public void resetAllowedLocalityLevel( SchedulerRequestKey schedulerKey, NodeType level) { - NodeType old = allowedLocalityLevel.get(schedulerKey); - LOG.info("Raising locality level from " + old + " to " + level + " at " + - " priority " + schedulerKey.getPriority()); - allowedLocalityLevel.put(schedulerKey, level); + NodeType old; + try { + writeLock.lock(); + old = allowedLocalityLevel.put(schedulerKey, level); + } finally { + writeLock.unlock(); + } + + LOG.info("Raising locality level from " + old + " to " + level + " at " + + " priority " + schedulerKey.getPriority()); } // related methods public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; - preemptionMap.put(container, time); - Resources.addTo(preemptedResources, container.getAllocatedResource()); + try { + writeLock.lock(); + preemptionMap.put(container, time); + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } finally { + writeLock.unlock(); + } } public Long getContainerPreemptionTime(RMContainer container) { @@ -584,21 +615,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt getUser(), rmContainer.getContainer().getResource()); } - private synchronized void setReservation(SchedulerNode node) { - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - Set<String> rackReservations = reservations.get(rackName); - if (rackReservations == null) { - rackReservations = new HashSet<>(); - reservations.put(rackName, rackReservations); + private void setReservation(SchedulerNode node) { + String rackName = + node.getRackName() == null ? "NULL" : node.getRackName(); + + try { + writeLock.lock(); + Set<String> rackReservations = reservations.get(rackName); + if (rackReservations == null) { + rackReservations = new HashSet<>(); + reservations.put(rackName, rackReservations); + } + rackReservations.add(node.getNodeName()); + } finally { + writeLock.unlock(); } - rackReservations.add(node.getNodeName()); } - private synchronized void clearReservation(SchedulerNode node) { - String rackName = node.getRackName() == null ? "NULL" : node.getRackName(); - Set<String> rackReservations = reservations.get(rackName); - if (rackReservations != null) { - rackReservations.remove(node.getNodeName()); + private void clearReservation(SchedulerNode node) { + String rackName = + node.getRackName() == null ? "NULL" : node.getRackName(); + + try { + writeLock.lock(); + Set<String> rackReservations = reservations.get(rackName); + if (rackReservations != null) { + rackReservations.remove(node.getNodeName()); + } + } finally { + writeLock.unlock(); } } @@ -737,7 +782,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // For each priority, see if we can schedule a node local, rack local // or off-switch request. Rack of off-switch requests may be delayed // (not scheduled) in order to promote better locality. - synchronized (this) { + try { + writeLock.lock(); for (SchedulerRequestKey schedulerKey : keysToTry) { // Skip it for reserved container, since // we already check it in isValidReservation. @@ -772,8 +818,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 && localRequest != null && localRequest.getNumContainers() != 0) { - return assignContainer(node, localRequest, - NodeType.NODE_LOCAL, reserved, schedulerKey); + return assignContainer(node, localRequest, NodeType.NODE_LOCAL, + reserved, schedulerKey); } if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) { @@ -781,29 +827,31 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0 - && (allowedLocality.equals(NodeType.RACK_LOCAL) || - allowedLocality.equals(NodeType.OFF_SWITCH))) { - return assignContainer(node, rackLocalRequest, - NodeType.RACK_LOCAL, reserved, schedulerKey); + && (allowedLocality.equals(NodeType.RACK_LOCAL) || allowedLocality + .equals(NodeType.OFF_SWITCH))) { + return assignContainer(node, rackLocalRequest, NodeType.RACK_LOCAL, + reserved, schedulerKey); } - ResourceRequest offSwitchRequest = - getResourceRequest(schedulerKey, ResourceRequest.ANY); + ResourceRequest offSwitchRequest = getResourceRequest(schedulerKey, + ResourceRequest.ANY); if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) { continue; } - if (offSwitchRequest != null && - offSwitchRequest.getNumContainers() != 0) { - if (!hasNodeOrRackLocalRequests(schedulerKey) || - allowedLocality.equals(NodeType.OFF_SWITCH)) { - return assignContainer( - node, offSwitchRequest, NodeType.OFF_SWITCH, reserved, - schedulerKey); + if (offSwitchRequest != null + && offSwitchRequest.getNumContainers() != 0) { + if (!hasNodeOrRackLocalRequests(schedulerKey) || allowedLocality + .equals(NodeType.OFF_SWITCH)) { + return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH, + reserved, schedulerKey); } } } + } finally { + writeLock.unlock(); } + return Resources.none(); } @@ -963,14 +1011,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt Resources.addTo(demand, getCurrentConsumption()); // Add up outstanding resource requests - synchronized (this) { + try { + writeLock.lock(); for (SchedulerRequestKey k : getSchedulerKeys()) { ResourceRequest r = getResourceRequest(k, ResourceRequest.ANY); if (r != null) { - Resources.multiplyAndAddTo(demand, - r.getCapability(), r.getNumContainers()); + Resources.multiplyAndAddTo(demand, r.getCapability(), + r.getNumContainers()); } } + } finally { + writeLock.unlock(); } }
--------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org