Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -38,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; @@ -58,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -70,9 +73,12 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -143,19 +149,26 @@ public class RMContainerAllocator extend private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceReqt;//memory - private int reduceResourceReqt;//memory + private int mapResourceRequest;//memory + private int reduceResourceRequest;//memory private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; + /** + * after this threshold, if the container request is not allocated, it is + * considered delayed. + */ + private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; + private Clock clock; private final AMPreemptionPolicy preemptionPolicy; - BlockingQueue<ContainerAllocatorEvent> eventQueue + @VisibleForTesting + protected BlockingQueue<ContainerAllocatorEvent> eventQueue = new LinkedBlockingQueue<ContainerAllocatorEvent>(); private ScheduleStats scheduleStats = new ScheduleStats(); @@ -165,6 +178,7 @@ public class RMContainerAllocator extend super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); + this.clock = context.getClock(); } @Override @@ -179,6 +193,9 @@ public class RMContainerAllocator extend maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); + allocationDelayThresholdMs = conf.getInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -245,7 +262,7 @@ public class RMContainerAllocator extend getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceReqt, reduceResourceReqt, + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; @@ -267,6 +284,18 @@ public class RMContainerAllocator extend scheduleStats.log("Final Stats: "); } + @Private + @VisibleForTesting + AssignedRequests getAssignedRequests() { + return assignedRequests; + } + + @Private + @VisibleForTesting + ScheduledRequests getScheduledRequests() { + return scheduledRequests; + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -302,16 +331,16 @@ public class RMContainerAllocator extend int supportedMaxContainerCapability = getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceReqt == 0) { - mapResourceReqt = reqEvent.getCapability().getMemory(); + if (mapResourceRequest == 0) { + mapResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceReqt))); - LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > supportedMaxContainerCapability) { + mapResourceRequest))); + LOG.info("mapResourceRequest:"+ mapResourceRequest); + if (mapResourceRequest > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( jobId, diagMsg)); @@ -319,20 +348,20 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceReqt); + reqEvent.getCapability().setMemory(mapResourceRequest); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceReqt == 0) { - reduceResourceReqt = reqEvent.getCapability().getMemory(); + if (reduceResourceRequest == 0) { + reduceResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceReqt))); - LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > supportedMaxContainerCapability) { + reduceResourceRequest))); + LOG.info("reduceResourceRequest:"+ reduceResourceRequest); + if (reduceResourceRequest > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceReqt: " + reduceResourceReqt + + "Job. reduceResourceRequest: " + reduceResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( @@ -341,7 +370,7 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceReqt); + reqEvent.getCapability().setMemory(reduceResourceRequest); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -365,6 +394,7 @@ public class RMContainerAllocator extend removed = true; assignedRequests.remove(aId); containersReleased++; + pendingRelease.add(containerId); release(containerId); } } @@ -393,8 +423,22 @@ public class RMContainerAllocator extend return host; } - private void preemptReducesIfNeeded() { - if (reduceResourceReqt == 0) { + @Private + @VisibleForTesting + synchronized void setReduceResourceRequest(int mem) { + this.reduceResourceRequest = mem; + } + + @Private + @VisibleForTesting + synchronized void setMapResourceRequest(int mem) { + this.mapResourceRequest = mem; + } + + @Private + @VisibleForTesting + void preemptReducesIfNeeded() { + if (reduceResourceRequest == 0) { return; //no reduces } //check if reduces have taken over the whole cluster and there are @@ -402,9 +446,9 @@ public class RMContainerAllocator extend if (scheduledRequests.maps.size() > 0) { int memLimit = getMemLimit(); int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); + assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceReqt) { + if (availableMemForMap < mapResourceRequest) { //to make sure new containers are given to maps and not reduces //ramp down all scheduled reduces if any //(since reduces are scheduled at higher priority than maps) @@ -413,22 +457,40 @@ public class RMContainerAllocator extend pendingReduces.add(req); } scheduledRequests.reduces.clear(); - - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceReqt, - (int) (maxReducePreemptionLimit * memLimit)); - - int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); + + //do further checking to find the number of map requests that were + //hanging around for a while + int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); + if (hangingMapRequests > 0) { + //preempt for making space for at least one map + int premeptionLimit = Math.max(mapResourceRequest, + (int) (maxReducePreemptionLimit * memLimit)); + + int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, + premeptionLimit); + + int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); + toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); + + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } } } } + + private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) { + if (allocationDelayThresholdMs <= 0) + return requestMap.size(); + int hangingRequests = 0; + long currTime = clock.getTime(); + for (ContainerRequest request: requestMap.values()) { + long delay = currTime - request.requestTimeMs; + if (delay > allocationDelayThresholdMs) + hangingRequests++; + } + return hangingRequests; + } @Private public void scheduleReduces( @@ -585,6 +647,15 @@ public class RMContainerAllocator extend if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this + // application + register(); + addOutstandingRequestOnResync(); + break; case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. @@ -608,7 +679,12 @@ public class RMContainerAllocator extend nmToken.getToken()); } } - + + // Setting AMRMToken + if (response.getAMRMToken() != null) { + updateAMRMToken(response.getAMRMToken()); + } + List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); // propagate preemption requests @@ -644,6 +720,7 @@ public class RMContainerAllocator extend LOG.error("Container complete event for unknown container id " + cont.getContainerId()); } else { + pendingRelease.remove(cont.getContainerId()); assignedRequests.remove(attemptID); // send the container completed event to Task attempt @@ -659,11 +736,24 @@ public class RMContainerAllocator extend } return newContainers; } - + + private void updateAMRMToken(Token token) throws IOException { + org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken = + new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token + .getIdentifier().array(), token.getPassword().array(), new Text( + token.getKind()), new Text(token.getService())); + UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser(); + if (UserGroupInformation.isSecurityEnabled()) { + currentUGI = UserGroupInformation.getLoginUser(); + } + currentUGI.addToken(amrmToken); + } + @VisibleForTesting public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID) { - if (cont.getExitStatus() == ContainerExitStatus.ABORTED) { + if (cont.getExitStatus() == ContainerExitStatus.ABORTED + || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { // killed by framework return new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_KILL); @@ -714,11 +804,13 @@ public class RMContainerAllocator extend @Private public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + return headRoom + assignedRequests.maps.size() * mapResourceRequest + + assignedRequests.reduces.size() * reduceResourceRequest; } - - private class ScheduledRequests { + + @Private + @VisibleForTesting + class ScheduledRequests { private final LinkedList<TaskAttemptId> earlierFailedMaps = new LinkedList<TaskAttemptId>(); @@ -728,7 +820,8 @@ public class RMContainerAllocator extend new HashMap<String, LinkedList<TaskAttemptId>>(); private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping = new HashMap<String, LinkedList<TaskAttemptId>>(); - private final Map<TaskAttemptId, ContainerRequest> maps = + @VisibleForTesting + final Map<TaskAttemptId, ContainerRequest> maps = new LinkedHashMap<TaskAttemptId, ContainerRequest>(); private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces = @@ -824,22 +917,22 @@ public class RMContainerAllocator extend int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceReqt + if (allocatedMemory < mapResourceRequest || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " - + " container memory less than required " + mapResourceReqt + + " container memory less than required " + mapResourceRequest + " or no pending map tasks - maps.isEmpty=" + maps.isEmpty()); isAssignable = false; } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceReqt + if (allocatedMemory < reduceResourceRequest || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " - + " container memory less than required " + reduceResourceReqt + + " container memory less than required " + reduceResourceRequest + " or no pending reduce tasks - reduces.isEmpty=" + reduces.isEmpty()); isAssignable = false; @@ -931,6 +1024,7 @@ public class RMContainerAllocator extend private void containerNotAssigned(Container allocated) { containersReleased++; + pendingRelease.add(allocated.getId()); release(allocated.getId()); } @@ -1118,14 +1212,18 @@ public class RMContainerAllocator extend } } - private class AssignedRequests { + @Private + @VisibleForTesting + class AssignedRequests { private final Map<ContainerId, TaskAttemptId> containerToAttemptMap = new HashMap<ContainerId, TaskAttemptId>(); private final LinkedHashMap<TaskAttemptId, Container> maps = new LinkedHashMap<TaskAttemptId, Container>(); - private final LinkedHashMap<TaskAttemptId, Container> reduces = + @VisibleForTesting + final LinkedHashMap<TaskAttemptId, Container> reduces = new LinkedHashMap<TaskAttemptId, Container>(); - private final Set<TaskAttemptId> preemptionWaitingReduces = + @VisibleForTesting + final Set<TaskAttemptId> preemptionWaitingReduces = new HashSet<TaskAttemptId>(); void add(Container container, TaskAttemptId tId) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Tue Aug 19 23:49:39 2014 @@ -29,8 +29,10 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -38,6 +40,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,7 +59,7 @@ public abstract class RMContainerRequest private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; private Resource availableResources; private final RecordFactory recordFactory = @@ -75,8 +78,11 @@ public abstract class RMContainerRequest // numContainers dont end up as duplicates private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>( new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator()); - private final Set<ContainerId> release = new TreeSet<ContainerId>(); - + private final Set<ContainerId> release = new TreeSet<ContainerId>(); + // pendingRelease holds history or release requests.request is removed only if + // RM sends completedContainer. + // How it different from release? --> release is for per allocate() request. + protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>(); private boolean nodeBlacklistingEnabled; private int blacklistDisablePercent; private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false); @@ -96,6 +102,8 @@ public abstract class RMContainerRequest super(clientService, context); } + @Private + @VisibleForTesting static class ContainerRequest { final TaskAttemptId attemptID; final Resource capability; @@ -103,20 +111,39 @@ public abstract class RMContainerRequest final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; - + /** + * the time when this request object was formed; can be used to avoid + * aggressive preemption for recently placed requests + */ + final long requestTimeMs; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), event.getRacks(), priority); } - + + public ContainerRequest(ContainerRequestEvent event, Priority priority, + long requestTimeMs) { + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority, requestTimeMs); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this(attemptID, capability, hosts, racks, priority, + System.currentTimeMillis()); + } + public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority) { + Resource capability, String[] hosts, String[] racks, + Priority priority, long requestTimeMs) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; + this.requestTimeMs = requestTimeMs; } public String toString() { @@ -163,6 +190,10 @@ public abstract class RMContainerRequest } catch (YarnException e) { throw new IOException(e); } + + if (isResyncCommand(allocateResponse)) { + return allocateResponse; + } lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -191,6 +222,28 @@ public abstract class RMContainerRequest return allocateResponse; } + protected boolean isResyncCommand(AllocateResponse allocateResponse) { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } + + protected void addOutstandingRequestOnResync() { + for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable + .values()) { + for (Map<Resource, ResourceRequest> capabalities : rr.values()) { + for (ResourceRequest request : capabalities.values()) { + addResourceRequestToAsk(request); + } + } + } + if (!ignoreBlacklisting.get()) { + blacklistAdditions.addAll(blacklistedNodes); + } + if (!pendingRelease.isEmpty()) { + release.addAll(pendingRelease); + } + } + // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Aug 19 23:49:39 2014 @@ -29,7 +29,9 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -52,13 +54,18 @@ public class KillAMPreemptionPolicy impl public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { // for both strict and negotiable preemption requests kill the // container - for (PreemptionContainer c : - preemptionRequests.getStrictContract().getContainers()) { - killContainer(ctxt, c); + StrictPreemptionContract strictContract = preemptionRequests + .getStrictContract(); + if (strictContract != null) { + for (PreemptionContainer c : strictContract.getContainers()) { + killContainer(ctxt, c); + } } - for (PreemptionContainer c : - preemptionRequests.getContract().getContainers()) { - killContainer(ctxt, c); + PreemptionContract contract = preemptionRequests.getContract(); + if (contract != null) { + for (PreemptionContainer c : contract.getContainers()) { + killContainer(ctxt, c); + } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Tue Aug 19 23:49:39 2014 @@ -85,7 +85,8 @@ public class TaskPage extends AppView { .append(ta.getId()).append("\",\"") .append(progress).append("\",\"") .append(ta.getState().toString()).append("\",\"") - .append(ta.getStatus()).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript( + StringEscapeUtils.escapeHtml(ta.getStatus()))).append("\",\"") .append(nodeHttpAddr == null ? "N/A" : "<a class='nodelink' href='" + MRWebAppUtil.getYARNWebappScheme() + nodeHttpAddr + "'>" Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TasksBlock.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.uti import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR; import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE; +import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo; @@ -102,7 +103,8 @@ public class TasksBlock extends HtmlBloc .append(join(pct, '%')).append("'> ").append("<div class='") .append(C_PROGRESSBAR_VALUE).append("' style='") .append(join("width:", pct, '%')).append("'> </div> </div>\",\"") - .append(info.getStatus()).append("\",\"") + .append(StringEscapeUtils.escapeJavaScript( + StringEscapeUtils.escapeHtml(info.getStatus()))).append("\",\"") .append(info.getState()).append("\",\"") .append(info.getStartTime()).append("\",\"") Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Aug 19 23:49:39 2014 @@ -63,6 +63,13 @@ public class TestTaskAttemptListenerImpl public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, + RMHeartbeatHandler rmHeartbeatHandler, AMPreemptionPolicy policy) { + + super(context, jobTokenSecretManager, rmHeartbeatHandler, policy); + } + + public MockTaskAttemptListenerImpl(AppContext context, + JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler, AMPreemptionPolicy policy) { @@ -210,7 +217,7 @@ public class TestTaskAttemptListenerImpl when(appCtx.getEventHandler()).thenReturn(ea); CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); policy.init(appCtx); - TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { @@ -271,7 +278,7 @@ public class TestTaskAttemptListenerImpl when(appCtx.getEventHandler()).thenReturn(ea); CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); policy.init(appCtx); - TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { @@ -326,7 +333,7 @@ public class TestTaskAttemptListenerImpl when(appCtx.getEventHandler()).thenReturn(ea); CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); policy.init(appCtx); - TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + TaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl( appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java Tue Aug 19 23:49:39 2014 @@ -24,7 +24,7 @@ import java.io.DataInputStream; import java.util.ArrayList; import java.util.Arrays; -import static junit.framework.Assert.*; +import static org.junit.Assert.*; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,7 @@ package org.apache.hadoop.mapreduce.jobhistory; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -28,15 +28,21 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.never; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.After; +import org.junit.AfterClass; +import static org.junit.Assert.assertFalse; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler private static final Log LOG = LogFactory .getLog(TestJobHistoryEventHandler.class); + private static MiniDFSCluster dfsCluster = null; + private static String coreSitePath; + + @BeforeClass + public static void setUpClass() throws Exception { + coreSitePath = "." + File.separator + "target" + File.separator + + "test-classes" + File.separator + "core-site.xml"; + Configuration conf = new HdfsConfiguration(); + dfsCluster = new MiniDFSCluster.Builder(conf).build(); + } + + @AfterClass + public static void cleanUpClass() throws Exception { + dfsCluster.shutdown(); + } + + @After + public void cleanTest() throws Exception { + new File(coreSitePath).delete(); + } @Test (timeout=50000) public void testFirstFlushOnCompletionEvent() throws Exception { @@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler } } + @Test (timeout=50000) + public void testDefaultFsIsUsedForHistory() throws Exception { + // Create default configuration pointing to the minicluster + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + dfsCluster.getURI().toString()); + FileOutputStream os = new FileOutputStream(coreSitePath); + conf.writeXml(os); + os.close(); + + // simulate execution under a non-default namenode + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + "file:///"); + + TestParams t = new TestParams(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir); + + JHEvenHandlerForTest realJheh = + new JHEvenHandlerForTest(t.mockAppContext, 0, false); + JHEvenHandlerForTest jheh = spy(realJheh); + jheh.init(conf); + + try { + jheh.start(); + handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( + t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000))); + + handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( + TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(), + new Counters(), new Counters()))); + + // If we got here then event handler worked but we don't know with which + // file system. Now we check that history stuff was written to minicluster + FileSystem dfsFileSystem = dfsCluster.getFileSystem(); + assertTrue("Minicluster contains some history files", + dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0); + FileSystem localFileSystem = LocalFileSystem.get(conf); + assertFalse("No history directory on non-default file system", + localFileSystem.exists(new Path(t.dfsWorkDir))); + } finally { + jheh.stop(); + } + } + private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) { jheh.handle(event); } @@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler private class TestParams { boolean isLastAMRetry; String workDir = setupTestWorkDir(); + String dfsWorkDir = "/" + this.getClass().getCanonicalName(); ApplicationId appId = ApplicationId.newInstance(200, 1); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); @@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler class JHEvenHandlerForTest extends JobHistoryEventHandler { private EventWriter eventWriter; + private boolean mockHistoryProcessing = true; public JHEvenHandlerForTest(AppContext context, int startCount) { super(context, startCount); } + public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) { + super(context, startCount); + this.mockHistoryProcessing = mockHistoryProcessing; + } + @Override protected void serviceStart() { } @@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHi @Override protected EventWriter createEventWriter(Path historyFilePath) throws IOException { - this.eventWriter = mock(EventWriter.class); + if (mockHistoryProcessing) { + this.eventWriter = mock(EventWriter.class); + } + else { + this.eventWriter = super.createEventWriter(historyFilePath); + } return this.eventWriter; } @@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHi } @Override - protected void processDoneFiles(JobId jobId){ - // do nothing + protected void processDoneFiles(JobId jobId) throws IOException { + if (!mockHistoryProcessing) { + super.processDoneFiles(jobId); + } + else { + // do nothing + } } } @@ -501,4 +592,4 @@ class JHEventHandlerForSigtermTest exten this.lastEventHandled = event; this.eventsHandled++; } -} \ No newline at end of file +} Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Aug 19 23:49:39 2014 @@ -24,8 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.EnumSet; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; @@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; /** @@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster { int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount, Clock clock, boolean unregistered, String assignedQueue) { - super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System - .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, + System.currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); @@ -573,7 +573,8 @@ public class MRApp extends MRAppMaster { Resource resource = Resource.newInstance(1234, 2); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), "user", - resource, System.currentTimeMillis() + 10000, 42, 42); + resource, System.currentTimeMillis() + 10000, 42, 42, + Priority.newInstance(0), 0); Token containerToken = newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); Container container = Container.newInstance(cId, nodeId, @@ -627,10 +628,18 @@ public class MRApp extends MRAppMaster { throws IOException { committer.abortJob(jobContext, state); } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException{ + return committer.isRecoverySupported(jobContext); + } + + @SuppressWarnings("deprecation") @Override public boolean isRecoverySupported() { return committer.isRecoverySupported(); } + @Override public void setupTask(TaskAttemptContext taskContext) throws IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java Tue Aug 19 23:49:39 2014 @@ -143,4 +143,9 @@ public class MockAppContext implements A return true; } +@Override + public String getNMHostname() { + // bogus - Not Required + return null; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java Tue Aug 19 23:49:39 2014 @@ -21,7 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a import java.util.Iterator; import java.util.List; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Aug 19 23:49:39 2014 @@ -23,7 +23,7 @@ import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Tue Aug 19 23:49:39 2014 @@ -253,6 +253,12 @@ public class TestJobEndNotifier extends HttpServer2 server = startHttpServer(); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, this.getClass().getName(), true, 2, false)); + // Currently, we will have isLastRetry always equals to false at beginning + // of MRAppMaster, except staging area exists or commit already started at + // the beginning. + // Now manually set isLastRetry to true and this should reset to false when + // unregister failed. + app.isLastAMRetry = true; doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, @@ -264,13 +270,13 @@ public class TestJobEndNotifier extends app.waitForInternalState(job, JobStateInternal.REBOOT); // Now shutdown. User should see FAILED state. // Unregistration fails: isLastAMRetry is recalculated, this is - app.shutDownJob(); - Assert.assertTrue(app.isLastAMRetry()); - Assert.assertEquals(1, JobEndServlet.calledTimes); - Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED", - JobEndServlet.requestUri.getQuery()); - Assert.assertEquals(JobState.FAILED.toString(), - JobEndServlet.foundJobState); + ///reboot will stop service internally, we don't need to shutdown twice + app.waitForServiceToStop(10000); + Assert.assertFalse(app.isLastAMRetry()); + // Since it's not last retry, JobEndServlet didn't called + Assert.assertEquals(0, JobEndServlet.calledTimes); + Assert.assertNull(JobEndServlet.requestUri); + Assert.assertNull(JobEndServlet.foundJobState); server.stop(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Tue Aug 19 23:49:39 2014 @@ -22,7 +22,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.CountDownLatch; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -214,6 +214,87 @@ public class TestKill { app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); } + static class MyAsyncDispatch extends AsyncDispatcher { + private CountDownLatch latch; + private TaskAttemptEventType attemptEventTypeToWait; + MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) { + super(); + this.latch = latch; + this.attemptEventTypeToWait = attemptEventTypeToWait; + } + + @Override + protected void dispatch(Event event) { + if (event instanceof TaskAttemptEvent) { + TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event; + TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID(); + if (attemptEvent.getType() == this.attemptEventTypeToWait + && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + super.dispatch(event); + } + } + + // This is to test a race condition where JobEventType.JOB_KILL is generated + // right after TaskAttemptEventType.TA_DONE is generated. + // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED + // and T_ATTEMPT_KILLED from the same attempt. + @Test + public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + final Dispatcher dispatcher = new MyAsyncDispatch(latch, TaskAttemptEventType.TA_DONE); + MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) { + @Override + public Dispatcher createDispatcher() { + return dispatcher; + } + }; + Job job = app.submit(new Configuration()); + JobId jobId = app.getJobId(); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + app.waitForState(mapTask, TaskState.RUNNING); + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + // The order in the dispatch event queue, from the oldest to the newest + // TA_DONE + // JOB_KILL + // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling ) + // T_KILL ( from JOB_KILL's handling ) + // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling ) + // TA_KILL ( from T_KILL's handling ) + // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling ) + // T_ATTEMPT_KILLED ( from TA_KILL's handling ) + + // Finish map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // Now kill the job + app.getContext().getEventHandler() + .handle(new JobEvent(jobId, JobEventType.JOB_KILL)); + + //unblock + latch.countDown(); + + app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED); + } + @Test public void testKillTaskAttempt() throws Exception { final CountDownLatch latch = new CountDownLatch(1); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Tue Aug 19 23:49:39 2014 @@ -26,7 +26,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -112,6 +112,15 @@ public class TestMRApp { //wait for first attempt to commit pending app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); + //re-send the commit pending signal to the task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + attempt.getID(), + TaskAttemptEventType.TA_COMMIT_PENDING)); + + //the task attempt should be still at COMMIT_PENDING + app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING); + //send the done signal to the task app.getContext().getEventHandler().handle( new TaskAttemptEvent( Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppComponentDependencies.java Tue Aug 19 23:49:39 2014 @@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a import java.io.IOException; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Aug 19 23:49:39 2014 @@ -31,7 +31,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -118,7 +118,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + System.currentTimeMillis()); JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -147,8 +147,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -186,8 +185,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -225,8 +223,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -264,8 +261,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -285,8 +281,9 @@ public class TestMRAppMaster { @Test (timeout = 30000) public void testMRAppMasterMaxAppAttempts() throws IOException, InterruptedException { - int[] maxAppAttemtps = new int[] { 1, 2, 3 }; - Boolean[] expectedBools = new Boolean[]{ true, true, false }; + // No matter what's the maxAppAttempt or attempt id, the isLastRetry always + // equals to false + Boolean[] expectedBools = new Boolean[]{ false, false, false }; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; @@ -301,10 +298,10 @@ public class TestMRAppMaster { File stagingDir = new File(MRApps.getStagingAreaDir(conf, userName).toString()); stagingDir.mkdirs(); - for (int i = 0; i < maxAppAttemtps.length; ++i) { + for (int i = 0; i < expectedBools.length; ++i) { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), maxAppAttemtps[i], false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); assertEquals("isLastAMRetry is correctly computed.", expectedBools[i], appMaster.isLastAMRetry()); @@ -399,7 +396,7 @@ public class TestMRAppMaster { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), 1, false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); // Now validate the task credentials @@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaste public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts) { + long submitTime) { this(applicationAttemptId, containerId, host, port, httpPort, - submitTime, maxAppAttempts, true, true); + submitTime, true, true); } public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts, boolean overrideInit, + long submitTime, boolean overrideInit, boolean overrideStart) { - super(applicationAttemptId, containerId, host, port, httpPort, submitTime, - maxAppAttempts); + super(applicationAttemptId, containerId, host, port, httpPort, submitTime); this.overrideInit = overrideInit; this.overrideStart = overrideStart; mockContainerAllocator = mock(ContainerAllocator.class); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Tue Aug 19 23:49:39 2014 @@ -24,7 +24,7 @@ import java.security.PrivilegedException import java.util.Iterator; import java.util.List; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobACL; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Aug 19 23:49:39 2014 @@ -34,7 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -425,6 +425,266 @@ public class TestRecovery { app.verifyCompleted(); } + /** + * The class provides a custom implementation of output committer setupTask + * and isRecoverySupported methods, which determines if recovery supported + * based on config property. + */ + public static class TestFileOutputCommitter extends + org.apache.hadoop.mapred.FileOutputCommitter { + + @Override + public boolean isRecoverySupported( + org.apache.hadoop.mapred.JobContext jobContext) { + boolean isRecoverySupported = false; + if (jobContext != null && jobContext.getConfiguration() != null) { + isRecoverySupported = jobContext.getConfiguration().getBoolean( + "want.am.recovery", false); + } + return isRecoverySupported; + } + } + + /** + * This test case primarily verifies if the recovery is controlled through config + * property. In this case, recover is turned ON. AM with 3 maps and 0 reduce. + * AM crashes after the first two tasks finishes and recovers completely and + * succeeds in the second generation. + * + * @throws Exception + */ + @Test + public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean("want.am.recovery", true); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + // all maps would be running + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task mapTask3 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + app.waitForState(mapTask3, TaskState.RUNNING); + + TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator() + .next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator() + .next(); + TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator() + .next(); + + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 1st two maps + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for first two map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // stop the app + app.stop(); + + // rerun + // in rerun the 1st two map will be recovered from previous run + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean("want.am.recovery", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + // Set num-reduces explicitly in conf as recovery logic depends on it. + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + mapTask3 = it.next(); + + // first two maps will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + app.waitForState(mapTask3, TaskState.RUNNING); + + task3Attempt = mapTask3.getAttempts().values().iterator().next(); + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 3rd map task + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask3.getAttempts().values().iterator() + .next().getID(), TaskAttemptEventType.TA_DONE)); + + // wait to get it completed + app.waitForState(mapTask3, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + + /** + * This test case primarily verifies if the recovery is controlled through config + * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce. + * AM crashes after the first two tasks finishes and recovery fails and have + * to rerun fully in the second generation and succeeds. + * + * @throws Exception + */ + @Test + public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception { + int runCount = 0; + MRApp app = + new MRAppWithHistory(3, 0, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean("want.am.recovery", false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + // all maps would be running + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task mapTask3 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + app.waitForState(mapTask3, TaskState.RUNNING); + + TaskAttempt task1Attempt = + mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = + mapTask2.getAttempts().values().iterator().next(); + TaskAttempt task3Attempt = + mapTask3.getAttempts().values().iterator().next(); + + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 1st two maps + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE)); + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE)); + + // wait for first two map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // stop the app + app.stop(); + + // rerun + // in rerun the 1st two map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean("want.am.recovery", false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + // Set num-reduces explicitly in conf as recovery logic depends on it. + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + mapTask3 = it.next(); + + // first two maps will NOT be recovered, need to send done from them + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + app.waitForState(mapTask3, TaskState.RUNNING); + + task3Attempt = mapTask3.getAttempts().values().iterator().next(); + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to all 3 tasks map task + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + + // wait to get it completed + app.waitForState(mapTask3, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + @Test public void testMultipleCrashes() throws Exception { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Tue Aug 19 23:49:39 2014 @@ -879,5 +879,10 @@ public class TestRuntimeEstimators { return true; } + @Override + public String getNMHostname() { + // bogus - Not Required + return null; + } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -28,9 +29,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import junit.framework.Assert; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Assert; import org.junit.Test; /** * Make sure that the job staging directory clean up happens. */ - public class TestStagingCleanup extends TestCase { + public class TestStagingCleanup { private Configuration conf = new Configuration(); private FileSystem fs; @@ -81,7 +80,7 @@ import org.junit.Test; public void testDeletionofStagingOnUnregistrationFailure() throws IOException { testDeletionofStagingOnUnregistrationFailure(2, false); - testDeletionofStagingOnUnregistrationFailure(1, true); + testDeletionofStagingOnUnregistrationFailure(1, false); } @SuppressWarnings("resource") @@ -104,7 +103,7 @@ import org.junit.Test; appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); - ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry(); + ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry(); if (shouldHaveDeleted) { Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); verify(fs).delete(stagingJobPath, true); @@ -164,7 +163,11 @@ import org.junit.Test; verify(fs, times(0)).delete(stagingJobPath, true); } - @Test (timeout = 30000) + // FIXME: + // Disabled this test because currently, when job state=REBOOT at shutdown + // when lastRetry = true in RM view, cleanup will not do. + // This will be supported after YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnReboot() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -202,7 +205,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = @@ -210,8 +213,12 @@ import org.junit.Test; hook.run(); verify(fs, times(0)).delete(stagingJobPath, true); } - - @Test (timeout = 30000) + + // FIXME: + // Disabled this test because currently, when shutdown hook triggered at + // lastRetry in RM view, cleanup will not do. This should be supported after + // YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnKillLastTry() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -226,7 +233,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry appMaster.init(conf); assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry()); //simulate the process being killed @@ -245,10 +252,10 @@ import org.junit.Test; boolean crushUnregistration = false; public TestMRApp(ApplicationAttemptId applicationAttemptId, - ContainerAllocator allocator, int maxAppAttempts) { + ContainerAllocator allocator) { super(applicationAttemptId, ContainerId.newInstance( applicationAttemptId, 1), "testhost", 2222, 3333, - System.currentTimeMillis(), maxAppAttempts); + System.currentTimeMillis()); this.allocator = allocator; this.successfullyUnregistered.set(true); } @@ -256,7 +263,7 @@ import org.junit.Test; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, JobStateInternal jobStateInternal, int maxAppAttempts) { - this(applicationAttemptId, allocator, maxAppAttempts); + this(applicationAttemptId, allocator); this.jobStateInternal = jobStateInternal; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.java Tue Aug 19 23:49:39 2014 @@ -27,7 +27,7 @@ import static org.mockito.Mockito.when; import java.util.concurrent.ConcurrentLinkedQueue; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext;