Author: sseth Date: Thu Sep 6 06:39:38 2012 New Revision: 1381473 URL: http://svn.apache.org/viewvc?rev=1381473&view=rev Log: MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted (sseth)
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Thu Sep 6 06:39:38 2012 @@ -12,3 +12,5 @@ Branch MR-3902 MAPREDUCE-4624. Reduce scheduling fixes, factor in MR-4437. (sseth) MAPREDUCE-4619. Change AMContainerMap to extend AbstractService (Tsuyoshi OZAWA via sseth) + + MAPREDUCE-4620. RMContainerAllocator should factor in nodes being blacklisted. (sseth) Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java?rev=1381473&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventNodeBlacklisted.java Thu Sep 6 06:39:38 2012 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app2.rm; + +import org.apache.hadoop.yarn.api.records.NodeId; + +public class AMSchedulerEventNodeBlacklisted extends AMSchedulerEvent { + + private final NodeId nodeId; // May need to be host instead. + + public AMSchedulerEventNodeBlacklisted(NodeId nodeId, boolean headRoomChanged) { + super(AMSchedulerEventType.S_NODE_BLACKLISTED); + this.nodeId = nodeId; + } + + public NodeId getNodeId() { + return this.nodeId; + } +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerAllocator.java Thu Sep 6 06:39:38 2012 @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.YarnExcept import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -84,8 +85,6 @@ import org.apache.hadoop.yarn.util.RackR public class RMContainerAllocator extends AbstractService implements ContainerAllocator { -// TODO XXX: Factor in MAPREDUCE-4437. Reduce scheduling needs to be looked into IAC - static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); public static final @@ -115,8 +114,7 @@ public class RMContainerAllocator extend @SuppressWarnings("rawtypes") private final EventHandler eventHandler; private final AMContainerMap containerMap; - - //TODO XXX Make Configurable. + // Run the scheduler if it hasn't run for this interval. private long scheduleInterval = 1000l; @@ -219,7 +217,8 @@ public class RMContainerAllocator extend LOG.info("AMSchedulerConfiguration: " + "ReUseEnabled: " + shouldReUse + ", reduceSlowStart: " + reduceSlowStart + ", maxReduceRampupLimit: " + maxReduceRampupLimit + ", maxReducePreemptionLimit: " - + maxReducePreemptionLimit); + + maxReducePreemptionLimit + ", scheduleThreadInterval: " + + scheduleInterval + " ms"); RackResolver.init(conf); } @@ -351,7 +350,7 @@ public class RMContainerAllocator extend case S_CONTAINER_COMPLETED: //Nothing specific to be done in this scheduler. break; case S_NODE_BLACKLISTED: - // TODO XXX Withdraw requests related to this node and place new ones. + handleNodeBlacklisted((AMSchedulerEventNodeBlacklisted) sEvent); break; case S_NODE_UNHEALTHY: // Ignore. RM will not allocated containers on this node. @@ -376,10 +375,12 @@ public class RMContainerAllocator extend event.getCapability().setMemory(reduceResourceReqt); if (event.isRescheduled()) { pendingReduces.addFirst(new ContainerRequestInfo(new ContainerRequest( - event, PRIORITY_REDUCE), event)); + event.getCapability(), event.getHosts(), event.getRacks(), + PRIORITY_REDUCE), event)); } else { pendingReduces.addLast(new ContainerRequestInfo(new ContainerRequest( - event, PRIORITY_REDUCE), event)); + event.getCapability(), event.getHosts(), event.getRacks(), + PRIORITY_REDUCE), event)); } } } @@ -392,7 +393,7 @@ public class RMContainerAllocator extend removed = scheduledRequests.remove(aId); if (!removed) { // Maybe assigned. - ContainerId containerId = assignedRequests.getContainerId(aId); + ContainerId containerId = assignedRequests.remove(aId); if (containerId != null) { // Ask the container to stop. sendEvent(new AMContainerEvent(containerId, @@ -425,8 +426,6 @@ public class RMContainerAllocator extend + event.getAttemptID() + ". Full event: " + event); } } - - // TODO XXX: Deal with node blacklisting. private void handleContainersAllocated( AMSchedulerEventContainersAllocated event) { @@ -447,9 +446,23 @@ public class RMContainerAllocator extend schedule(); } - // TODO XXX: Deal with node blacklisting. - - + // TODO Add a test later if TestRMContainerAllocator does not have one for + // blacklisting. + private void handleNodeBlacklisted(AMSchedulerEventNodeBlacklisted event) { + NodeId nodeId = event.getNodeId(); + String host = nodeId.getHost(); + // Only maps would have asked for containers on a specific node. + List<TaskAttemptId> affectedAttemptIds = scheduledRequests.mapsHostMapping.get(host); + for (TaskAttemptId taId : affectedAttemptIds) { + ContainerRequestInfo cr = scheduledRequests.maps.get(taId); + scheduledRequests.remove(taId); + scheduledRequests.addMap(cr.launchRequestEvent); + } + // Instead of removing / re-adding each individual request, it may be more + // efficient to modify internal data structures, and send a request to the + // RMComm to completely forget about a host. + } + // TODO Override for re-use. protected synchronized void assignContainers() { if (availableContainerIds.size() > 0) { @@ -771,14 +784,20 @@ public class RMContainerAllocator extend return null; } + /** + * Considers node blacklisting while create container ask requests for the + * RMContainerAllocator. + */ void addMap(AMSchedulerTALaunchRequestEvent event) { ContainerRequest request = null; - + if (event.isRescheduled()) { earlierFailedMaps.add(event.getAttemptID()); - request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP); + request = new ContainerRequest(event.getCapability(), event.getHosts(), + event.getRacks(), PRIORITY_FAST_FAIL_MAP); LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); } else { + List<String> hosts = new LinkedList<String>(); for (String host : event.getHosts()) { LinkedList<TaskAttemptId> list = mapsHostMapping.get(host); if (list == null) { @@ -789,6 +808,14 @@ public class RMContainerAllocator extend if (LOG.isDebugEnabled()) { LOG.debug("Added attempt req to host " + host); } + if (!appContext.getAllNodes().isHostBlackListed(host)) { + hosts.add(host); + } else { + // Leaving the entries in mapsHostMapping etc. Will allow allocation + // in case all nodes get blacklisted / blacklisting gets enabled. + LOG.info("XXX: Host: " + host + + " is blacklisted. Not including in Container request"); + } } for (String rack: event.getRacks()) { LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack); @@ -801,9 +828,13 @@ public class RMContainerAllocator extend LOG.debug("Added attempt req to rack " + rack); } } - request = new ContainerRequest(event, PRIORITY_MAP); + request = new ContainerRequest(event.getCapability(), + hosts.toArray(new String[0]), event.getRacks(), PRIORITY_MAP); } -// ContainerRequestInfo csInfo = new ContainerRequestInfo(request, event.getAttemptID()); + // ContainerRequestInfo ends up with the correct ContainerRequest, and the + // original event. + // Remove works on the basis of the ContainerRequest while asking the + // RMComm to decrement a container request. maps.put(event.getAttemptID(), new ContainerRequestInfo(request, event)); requestor.addContainerReq(request); } @@ -864,40 +895,38 @@ public class RMContainerAllocator extend isAssignable = false; } } - -// boolean blackListed = false; - boolean nodeUsable = true; + + boolean nodeUnhealthy = false; + boolean blackListed = false; ContainerRequestInfo assigned = null; if (isAssignable) { - // do not assign if allocated container is on a - // blacklisted host String allocatedHost = allocated.getNodeId().getHost(); - // TODO XXX: Modify the Request table as and when containers are allocated on bad hosts, as against updating the table as soon as a node is blacklisted / lost. - // Blakclisted nodes should likely be removed immediately. // TODO Differentiation between blacklisted versus unusable nodes ? - boolean blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost); - nodeUsable = appContext.getNode(allocated.getNodeId()).isUsable(); + // Ideally there should be no assignments on unhealthy nodes. + blackListed = appContext.getAllNodes().isHostBlackListed(allocatedHost); + nodeUnhealthy = appContext.getNode(allocated.getNodeId()).isUnhealthy(); - if (!nodeUsable || blackListed) { + if (nodeUnhealthy || blackListed) { // we need to request for a new container // and release the current one - LOG.info("Got allocated container on an unusable " - + " host "+allocatedHost - +". Releasing container " + allocated); - - // find the request matching this allocated container - // and replace it with a new one + LOG.info("Got allocated container on an unusable " + " host " + + allocatedHost + ". Releasing container " + allocated + + " NodeUnhealthy: " + nodeUnhealthy + ", NodeBlackListed: " + + blackListed); + + // find the request matching this allocated container and replace it + // with a new one. Have to ensure a request goes out to the RM + // asking for a new container. Hence a decRequest + addRequest. ContainerRequestInfo toBeReplacedReq = getContainerReqToReplace(allocated); - - // TODO XXX: Requirement here is to be able to figure out the taskAttemptId for which this request was put. If that's being replaced, update corresponding maps with info. - // Effectively a RequestInfo to attemptId map - or a structure which includes both. - + if (toBeReplacedReq != null) { LOG.info("Placing a new container request for task attempt " + toBeReplacedReq.getAttemptId()); + // This isn't necessarily needed, since the request should have changed + // when the node blacklist event was received. ContainerRequestInfo newReq = getFilteredContainerRequest(toBeReplacedReq); requestor.decContainerReq(toBeReplacedReq.getContainerRequest()); @@ -922,15 +951,16 @@ public class RMContainerAllocator extend requestor.decContainerReq(assigned.getContainerRequest()); // TODO Maybe: ApplicationACLs should be populated into the appContext from the RMCommunicator. - - - // TODO XXX: Launch only if not already running. - // TODO XXX: Change this event to be more specific. + if (appContext.getContainer(containerId).getState() == AMContainerState.ALLOCATED) { - eventHandler.handle(new AMContainerLaunchRequestEvent(containerId, attemptToLaunchRequestMap.get(assigned.getAttemptId()), requestor.getApplicationAcls(), getJob().getID())); + eventHandler.handle(new AMContainerLaunchRequestEvent( + containerId, attemptToLaunchRequestMap.get(assigned + .getAttemptId()), requestor.getApplicationAcls(), + getJob().getID())); } - eventHandler.handle(new AMContainerAssignTAEvent(containerId, assigned.getAttemptId(), attemptToLaunchRequestMap.get(assigned.getAttemptId()).getRemoteTask())); - // TODO XXX: If re-using, get rid of one request. + eventHandler.handle(new AMContainerAssignTAEvent(containerId, + assigned.getAttemptId(), attemptToLaunchRequestMap.get( + assigned.getAttemptId()).getRemoteTask())); assignedRequests.add(allocated, assigned.getAttemptId()); @@ -951,15 +981,13 @@ public class RMContainerAllocator extend // release container if it was blacklisted // or if we could not assign it - if (!nodeUsable || assigned == null) { + if (blackListed || nodeUnhealthy || assigned == null) { containersReleased++; sendEvent(new AMContainerEvent(containerId, AMContainerEventType.C_STOP_REQUEST)); } } } - - // TODO XXX: Check whether the node is bad before an assign. - + private ContainerRequestInfo assign(Container allocated) { ContainerRequestInfo assigned = null; @@ -1070,7 +1098,8 @@ public class RMContainerAllocator extend assigned = maps.remove(tId); JobCounterUpdateEvent jce = new JobCounterUpdateEvent(tId.getTaskId().getJobId()); - // TODO XXX: Move these counter updated to go out from the TaskAttempt. + // TODO XXX (After MR-3902 if the counter updates are correct): Move + // these counter updated to go out from the TaskAttempt. jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); eventHandler.handle(jce); hostLocalAssigned++; @@ -1158,20 +1187,7 @@ public class RMContainerAllocator extend eventHandler.handle(new TaskAttemptEventKillRequest(id, "Pre-empting reduce")); } } - - ContainerId getContainerId(TaskAttemptId taId) { - ContainerId containerId = null; - if (taId.getTaskId().getTaskType().equals(TaskType.MAP)) { - containerId = maps.get(taId).getId(); - } else { - containerId = reduces.get(taId).getId(); - } - return containerId; - } - - // TODO XXX Check where all this is being used. - // XXX: Likely needed in case of TA failed / killed / terminated as well. - // Old code was removing when CONTAINER_COMPLETED was received fromthe RM. + ContainerId remove(TaskAttemptId tId) { ContainerId containerId = null; if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { @@ -1179,7 +1195,6 @@ public class RMContainerAllocator extend } else { containerId = reduces.remove(tId).getId(); if (containerId != null) { - // TODO XXX -> Revisit remove(), semantics change. boolean preempted = preemptionWaitingReduces.remove(tId); if (preempted) { LOG.info("Reduce preemption successful " + tId); Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java Thu Sep 6 06:39:38 2012 @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventReleased; +import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventNodeCountUpdated; import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventStateChanged; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; @@ -95,12 +96,12 @@ public class RMContainerRequestor extend private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0); private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>(); - // TODO XXX: May need to pass this to the AMNodeMap private int clusterNmCount = 0; // TODO XXX Consider allowing sync comm between the requestor and allocator... - // TODO (after 3902): Why does the RMRequestor require the ClientService ?? (for the RPC address. get rid of this.) + // TODO (after 3902): Why does the RMRequestor require the ClientService ?? + // (for the RPC address. get rid of this.) public RMContainerRequestor(ClientService clientService, AppContext context) { super(clientService, context); this.clock = context.getClock(); @@ -112,11 +113,6 @@ public class RMContainerRequestor extend final String[] racks; final Priority priority; - public ContainerRequest(AMSchedulerTALaunchRequestEvent event, - Priority priority) { - this(event.getCapability(), event.getHosts(), event.getRacks(), priority); - } - public ContainerRequest(Resource capability, String[] hosts, String[] racks, Priority priority) { this.capability = capability; @@ -157,9 +153,9 @@ public class RMContainerRequestor extend // Create resource requests for (String host : req.hosts) { // Data-local - if (!context.getAllNodes().isHostBlackListed(host)) { - addResourceRequest(req.priority, host, req.capability); - } + // Assumes the scheduler is handling bad nodes. Tracking them here would + // lead to an out-of-sync scheduler / requestor. + addResourceRequest(req.priority, host, req.capability); } // Nothing Rack-local for now @@ -316,6 +312,7 @@ public class RMContainerRequestor extend LOG.info("BeforeHeartbeat: " + getStat()); int headRoom = getAvailableResources() != null ? getAvailableResources() .getMemory() : 0;// first time it would be null + int lastClusterNmCount = clusterNmCount; AMResponse response = errorCheckedMakeRemoteRequest(); int newHeadRoom = getAvailableResources() != null ? getAvailableResources() @@ -334,6 +331,12 @@ public class RMContainerRequestor extend LOG.info("AfterHeartbeat: " + getStat()); + if (clusterNmCount != lastClusterNmCount) { + LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to " + + clusterNmCount); + eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount)); + } + // Inform the Containers about completion.. for (ContainerStatus c : finishedContainers) { eventHandler.handle(new AMContainerEventReleased(c)); @@ -344,7 +347,6 @@ public class RMContainerRequestor extend if (newContainers.size() > 0) { newContainerIds = new ArrayList<ContainerId>(newContainers.size()); for (Container container : newContainers) { - // TODO XXX Re-factor AMNodes and AMContainers. context.getAllContainers().addNewContainer(container); newContainerIds.add(container.getId()); context.getAllNodes().nodeSeen(container.getNodeId()); @@ -432,7 +434,6 @@ public class RMContainerRequestor extend RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent) rawEvent; releaseLock.lock(); try { - // TODO XXX: Currently the RM does not handle release requests for RUNNING containers. numContainerReleaseRequests++; release.add(event.getContainerId()); } finally { Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNode.java Thu Sep 6 06:39:38 2012 @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; import java.util.List; @@ -12,6 +30,6 @@ public interface AMNode extends EventHan public AMNodeState getState(); public List<ContainerId> getContainers(); - public boolean isUsable(); + public boolean isUnhealthy(); public boolean isBlacklisted(); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEvent.java Thu Sep 6 06:39:38 2012 @@ -1,10 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.AbstractEvent; -// TODO: Implement. - public class AMNodeEvent extends AbstractEvent<AMNodeEventType> { private final NodeId nodeId; Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java?rev=1381473&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventNodeCountUpdated.java Thu Sep 6 06:39:38 2012 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app2.rm.node; + +public class AMNodeEventNodeCountUpdated extends AMNodeEvent { + + private final int count; + + public AMNodeEventNodeCountUpdated(int nodeCount) { + super(null, AMNodeEventType.N_NODE_COUNT_UPDATED); + this.count = nodeCount; + } + + public int getNodeCount() { + return this.count; + } +} Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeEventType.java Thu Sep 6 06:39:38 2012 @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; public enum AMNodeEventType { @@ -11,11 +29,13 @@ public enum AMNodeEventType { //Producer: RMCommunicator N_TURNED_UNHEALTHY, N_TURNED_HEALTHY, + N_NODE_COUNT_UPDATED, //Producer: AMNodeManager N_BLACKLISTING_ENABLED, N_BLACKLISTING_DISABLED, - //Producer: Node - Will not reach NodeImpl. Used to compute whether blacklisting should be ignored. + // Producer: AMNode - Will not reach AMNodeImpl. Used to compute whether + // blacklisting should be ignored. N_NODE_WAS_BLACKLISTED -} +} \ No newline at end of file Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeImpl.java Thu Sep 6 06:39:38 2012 @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; import java.util.EnumSet; @@ -378,11 +396,10 @@ public class AMNodeImpl implements AMNod } @Override - public boolean isUsable() { + public boolean isUnhealthy() { this.readLock.lock(); try { - return (EnumSet.of(AMNodeState.ACTIVE, AMNodeState.FORCED_ACTIVE) - .contains(getState())); + return getState() == AMNodeState.UNHEALTHY; } finally { this.readLock.unlock(); } Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java?rev=1381473&r1=1381472&r2=1381473&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/node/AMNodeMap.java Thu Sep 6 06:39:38 2012 @@ -1,3 +1,21 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + package org.apache.hadoop.mapreduce.v2.app2.rm.node; import java.util.ArrayList; @@ -15,26 +33,28 @@ import org.apache.hadoop.yarn.service.Ab public class AMNodeMap extends AbstractService implements EventHandler<AMNodeEvent> { - + static final Log LOG = LogFactory.getLog(AMNodeMap.class); private final ConcurrentHashMap<NodeId, AMNode> nodeMap; - // TODO XXX -> blacklistMap is also used for computing forcedUnblacklisting. private final ConcurrentHashMap<String, ArrayList<NodeId>> blacklistMap; private final EventHandler<?> eventHandler; private final AppContext appContext; + private int numClusterNodes; + private boolean ignoreBlacklisting = false; private int maxTaskFailuresPerNode; private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; + + // TODO XXX Ensure there's a test for IgnoreBlacklisting in + // TestRMContainerAllocator. Otherwise add one. public AMNodeMap(EventHandler<?> eventHandler, AppContext appContext) { super("AMNodeMap"); this.nodeMap = new ConcurrentHashMap<NodeId, AMNode>(); this.blacklistMap = new ConcurrentHashMap<String, ArrayList<NodeId>>(); this.eventHandler = eventHandler; this.appContext = appContext; - - // TODO XXX: Get a handle of allowed failures. } @Override @@ -66,13 +86,14 @@ public class AMNodeMap extends AbstractS } public boolean isHostBlackListed(String hostname) { - if (!nodeBlacklistingEnabled) { + // TODO Maybe: For now, forced unblacklisting is being handled + // here. An AMNode will never go into the BLACKLISTED / FORCED_ACTIVE state. + if (!nodeBlacklistingEnabled || ignoreBlacklisting) { return false; } - return blacklistMap.containsKey(hostname); } - + private void addToBlackList(NodeId nodeId) { String host = nodeId.getHost(); ArrayList<NodeId> nodes; @@ -99,23 +120,59 @@ public class AMNodeMap extends AbstractS } } */ - - public void handle(AMNodeEvent event) { - if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) { - NodeId nodeId = event.getNodeId(); + + public void handle(AMNodeEvent rEvent) { + // No synchronization required until there's multiple dispatchers. + NodeId nodeId = rEvent.getNodeId(); + switch (rEvent.getType()) { + case N_NODE_WAS_BLACKLISTED: addToBlackList(nodeId); - } else { - NodeId nodeId = event.getNodeId(); - nodeMap.get(nodeId).handle(event); + computeIgnoreBlacklisting(); + break; + case N_NODE_COUNT_UPDATED: + AMNodeEventNodeCountUpdated event = (AMNodeEventNodeCountUpdated) rEvent; + numClusterNodes = event.getNodeCount(); + computeIgnoreBlacklisting(); + break; + default: + nodeMap.get(nodeId).handle(rEvent); } } - + + // May be incorrect if there's multiple NodeManagers running on a single host. + // knownNodeCount is based on node managers, not hosts. blacklisting is + // currently based on hosts. + protected void computeIgnoreBlacklisting() { + if (!nodeBlacklistingEnabled) { + return; + } + if (blacklistDisablePercent != -1) { + if (numClusterNodes == 0) { + LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting"); + return; + } + int val = (int) ((float) blacklistMap.size() / numClusterNodes * 100); + if (val >= blacklistDisablePercent) { + if (ignoreBlacklisting == false) { + ignoreBlacklisting = true; + LOG.info("Ignore Blacklisting set to true. Known: " + numClusterNodes + + ", Blacklisted: " + blacklistMap.size()); + } + } else { + if (ignoreBlacklisting == true) { + ignoreBlacklisting = false; + LOG.info("Ignore blacklisting set to false. Known: " + + numClusterNodes + ", Blacklisted: " + blacklistMap.size()); + } + } + } + } + public AMNode get(NodeId nodeId) { return nodeMap.get(nodeId); } - + public int size() { return nodeMap.size(); } - -} +} \ No newline at end of file