Author: cnauroth Date: Fri Dec 20 01:01:18 2013 New Revision: 1552467 URL: http://svn.apache.org/r1552467 Log: Merge trunk to HDFS-4685.
Added: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/ - copied from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/ - copied from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointID.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointID.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointNamingService.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointNamingService.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointService.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/CheckpointService.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointID.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointID.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointService.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/FSCheckpointService.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/RandomNameCNS.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/RandomNameCNS.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/SimpleNamingService.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/SimpleNamingService.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/ - copied from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointID.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointID.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointService.java - copied unchanged from r1552465, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/checkpoint/TestFSCheckpointService.java Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1551332-1552465 Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt Fri Dec 20 01:01:18 2013 @@ -71,6 +71,12 @@ Trunk (Unreleased) MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing. (Srikanth Sundarrajan via amareshwari) + MAPREDUCE-5197. Add a service for checkpointing task state. + (Carlo Curino via cdouglas) + + MAPREDUCE-5189. Add policies and wiring to respond to preemption requests + from YARN. (Carlo Curino via cdouglas) + BUG FIXES MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant. @@ -240,6 +246,12 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5623. TestJobCleanup fails because of RejectedExecutionException and NPE. (jlowe) + MAPREDUCE-5679. TestJobHistoryParsing has race condition (Liyin Liang via + jlowe) + + MAPREDUCE-5687. Fixed failure in TestYARNRunner caused by YARN-1446. (Jian He + via vinodkv) + Release 2.3.0 - UNRELEASED INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1551332-1552465 Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1551332-1552465 Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Fri Dec 20 01:01:18 2013 @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl ext .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); private JobTokenSecretManager jobTokenSecretManager = null; + private AMPreemptionPolicy preemptionPolicy; public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, - RMHeartbeatHandler rmHeartbeatHandler) { + RMHeartbeatHandler rmHeartbeatHandler, + AMPreemptionPolicy preemptionPolicy) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; this.rmHeartbeatHandler = rmHeartbeatHandler; + this.preemptionPolicy = preemptionPolicy; } @Override Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Dec 20 01:01:18 2013 @@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -188,8 +190,8 @@ public class MRAppMaster extends Composi private ContainerLauncher containerLauncher; private EventHandler<CommitterEvent> committerEventHandler; private Speculator speculator; - private TaskAttemptListener taskAttemptListener; - private JobTokenSecretManager jobTokenSecretManager = + protected TaskAttemptListener taskAttemptListener; + protected JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; @@ -197,6 +199,7 @@ public class MRAppMaster extends Composi private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; private SpeculatorEventDispatcher speculatorEventDispatcher; + private AMPreemptionPolicy preemptionPolicy; private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init @@ -383,8 +386,12 @@ public class MRAppMaster extends Composi committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); + //policy handling preemption requests from RM + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + //service to handle requests to TaskUmbilicalProtocol - taskAttemptListener = createTaskAttemptListener(context); + taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); addIfService(taskAttemptListener); //service to log job history events @@ -475,6 +482,12 @@ public class MRAppMaster extends Composi return committer; } + protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MRJobConfig.MR_AM_PREEMPTION_POLICY, + NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf); + } + protected boolean keepJobFiles(JobConf conf) { return (conf.getKeepTaskFilesPattern() != null || conf .getKeepFailedTaskFiles()); @@ -692,10 +705,11 @@ public class MRAppMaster extends Composi } } - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener(AppContext context, + AMPreemptionPolicy preemptionPolicy) { TaskAttemptListener lis = new TaskAttemptListenerImpl(context, jobTokenSecretManager, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), preemptionPolicy); return lis; } @@ -805,7 +819,7 @@ public class MRAppMaster extends Composi , containerID); } else { this.containerAllocator = new RMContainerAllocator( - this.clientService, this.context); + this.clientService, this.context, preemptionPolicy); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Dec 20 01:01:18 2013 @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -147,13 +149,17 @@ public class RMContainerAllocator extend private long retryInterval; private long retrystartTime; + private final AMPreemptionPolicy preemptionPolicy; + BlockingQueue<ContainerAllocatorEvent> eventQueue = new LinkedBlockingQueue<ContainerAllocatorEvent>(); private ScheduleStats scheduleStats = new ScheduleStats(); - public RMContainerAllocator(ClientService clientService, AppContext context) { + public RMContainerAllocator(ClientService clientService, AppContext context, + AMPreemptionPolicy preemptionPolicy) { super(clientService, context); + this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); } @@ -361,11 +367,15 @@ public class RMContainerAllocator extend LOG.error("Could not deallocate container for task attemptId " + aId); } + preemptionPolicy.handleCompletedContainer(event.getAttemptID()); } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { ContainerFailedEvent fEv = (ContainerFailedEvent) event; String host = getHost(fEv.getContMgrAddress()); containerFailedOnHost(host); + // propagate failures to preemption policy to discard checkpoints for + // failed tasks + preemptionPolicy.handleFailedContainer(event.getAttemptID()); } } @@ -399,7 +409,7 @@ public class RMContainerAllocator extend } scheduledRequests.reduces.clear(); - //preempt for making space for atleast one map + //preempt for making space for at least one map int premeptionLimit = Math.max(mapResourceReqt, (int) (maxReducePreemptionLimit * memLimit)); @@ -409,7 +419,7 @@ public class RMContainerAllocator extend int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - LOG.info("Going to preempt " + toPreempt); + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); assignedRequests.preemptReduce(toPreempt); } } @@ -595,6 +605,14 @@ public class RMContainerAllocator extend } List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); + + // propagate preemption requests + final PreemptionMessage preemptReq = response.getPreemptionMessage(); + if (preemptReq != null) { + preemptionPolicy.preempt( + new PreemptionContext(assignedRequests), preemptReq); + } + if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; @@ -630,7 +648,9 @@ public class RMContainerAllocator extend String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); - } + + preemptionPolicy.handleCompletedContainer(attemptID); + } } return newContainers; } @@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend " RackLocal:" + rackLocalAssigned); } } + + static class PreemptionContext extends AMPreemptionPolicy.Context { + final AssignedRequests reqs; + + PreemptionContext(AssignedRequests reqs) { + this.reqs = reqs; + } + @Override + public TaskAttemptId getTaskAttempt(ContainerId container) { + return reqs.get(container); + } + + @Override + public List<Container> getContainers(TaskType t){ + if(TaskType.REDUCE.equals(t)) + return new ArrayList<Container>(reqs.reduces.values()); + if(TaskType.MAP.equals(t)) + return new ArrayList<Container>(reqs.maps.values()); + return null; + } + + } + } Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Dec 20 01:01:18 2013 @@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler); + super(context, jobTokenSecretManager, rmHeartbeatHandler, null); this.taskHeartbeatHandler = hbHandler; } @@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Dec 20 01:01:18 2013 @@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; @@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster { } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { return new TaskAttemptListener(){ @Override public InetSocketAddress getAddress() { Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Fri Dec 20 01:01:18 2013 @@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -61,6 +63,8 @@ public class MRAppBenchmark { /** * Runs memory and time benchmark with Mock MRApp. + * @param app Application to submit + * @throws Exception On application failure */ public void run(MRApp app) throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -133,6 +137,7 @@ public class MRAppBenchmark { protected void serviceStart() throws Exception { thread = new Thread(new Runnable() { @Override + @SuppressWarnings("unchecked") public void run() { ContainerAllocatorEvent event = null; while (!Thread.currentThread().isInterrupted()) { @@ -192,7 +197,9 @@ public class MRAppBenchmark { @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { - return new RMContainerAllocator(clientService, context) { + + AMPreemptionPolicy policy = new NoopAMPreemptionPolicy(); + return new RMContainerAllocator(clientService, context, policy) { @Override protected ApplicationMasterProtocol createSchedulerProxy() { return new ApplicationMasterProtocol() { Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Dec 20 01:01:18 2013 @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -247,13 +248,14 @@ public class TestFail { super(maps, reduces, false, "TimeOutTaskMRApp", true); } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { //This will create the TaskAttemptListener with TaskHeartbeatHandler //RPC servers are not started //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null, null) { + return new TaskAttemptListenerImpl(getContext(), null, null, policy) { @Override public void startRpcServer(){}; @Override Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Dec 20 01:01:18 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; + import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; @@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator { // Use this constructor when using a real job. MyContainerAllocator(MyResourceManager rm, ApplicationAttemptId appAttemptId, AppContext context) { - super(createMockClientService(), context); + super(createMockClientService(), context, new NoopAMPreemptionPolicy()); this.rm = rm; } // Use this constructor when you are using a mocked job. public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { - super(createMockClientService(), createAppContext(appAttemptId, job)); + super(createMockClientService(), createAppContext(appAttemptId, job), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator { public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job, Clock clock) { super(createMockClientService(), - createAppContext(appAttemptId, job, clock)); + createAppContext(appAttemptId, job, clock), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator { ApplicationId.newInstance(1, 1)); RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), appContext) { + mock(ClientService.class), appContext, + new NoopAMPreemptionPolicy()) { @Override protected void register() { } @@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator { @Test public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), mock(AppContext.class)); + mock(ClientService.class), mock(AppContext.class), + new NoopAMPreemptionPolicy()); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Fri Dec 20 01:01:18 2013 @@ -45,5 +45,9 @@ public enum JobCounter { TOTAL_LAUNCHED_UBERTASKS, NUM_UBER_SUBMAPS, NUM_UBER_SUBREDUCES, - NUM_FAILED_UBERTASKS + NUM_FAILED_UBERTASKS, + TASKS_REQ_PREEMPT, + CHECKPOINTS, + CHECKPOINT_BYTES, + CHECKPOINT_TIME } Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Dec 20 01:01:18 2013 @@ -459,7 +459,13 @@ public interface MRJobConfig { public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = MR_AM_PREFIX + "job.reduce.preemption.limit"; public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f; - + + /** + * Policy class encoding responses to preemption requests. + */ + public static final String MR_AM_PREEMPTION_POLICY = + MR_AM_PREFIX + "preemption.policy"; + /** AM ACL disabled. **/ public static final String JOB_AM_ACCESS_DISABLED = "mapreduce.job.am-access-disabled"; @@ -708,4 +714,7 @@ public interface MRJobConfig { public static final String MR_APPLICATION_TYPE = "MAPREDUCE"; + public static final String TASK_PREEMPTION = + "mapreduce.job.preemption"; + } Propchange: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1551332-1552465 Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Fri Dec 20 01:01:18 2013 @@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name= Total SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms) FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) +TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt +CHECKPOINTS.name= Number of checkpoints reported +CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints +CHECKPOINT_TIME.name= Total time spent checkpointing (ms) \ No newline at end of file Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Dec 20 01:01:18 2013 @@ -339,8 +339,11 @@ public class TestJobHistoryParsing { PrintStream stdps = System.out; try { System.setOut(new PrintStream(outContent)); - HistoryViewer viewer = new HistoryViewer(fc.makeQualified( - fileInfo.getHistoryFile()).toString(), conf, true); + HistoryViewer viewer; + synchronized (fileInfo) { + viewer = new HistoryViewer(fc.makeQualified( + fileInfo.getHistoryFile()).toString(), conf, true); + } viewer.print(); for (TaskInfo taskInfo : allTasks.values()) { @@ -397,29 +400,27 @@ public class TestJobHistoryParsing { // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); + HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); + + JobHistoryParser parser; + JobInfo jobInfo; + synchronized (fileInfo) { + Path historyFilePath = fileInfo.getHistoryFile(); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); - - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); + parser = new JobHistoryParser(in); + jobInfo = parser.parse(); } - - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); Exception parseException = parser.getParseException(); Assert.assertNull("Caught an expected exception " + parseException, parseException); @@ -464,29 +465,28 @@ public class TestJobHistoryParsing { // make sure all events are flushed app.waitForState(Service.STATE.STOPPED); - String jobhistoryDir = JobHistoryUtils - .getHistoryIntermediateDoneDirForUser(conf); JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); - JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); - - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - FileContext fc = null; - try { - fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); - } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); - } + HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); + + JobHistoryParser parser; + JobInfo jobInfo; + synchronized (fileInfo) { + Path historyFilePath = fileInfo.getHistoryFile(); + FSDataInputStream in = null; + FileContext fc = null; + try { + fc = FileContext.getFileContext(conf); + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } - JobHistoryParser parser = new JobHistoryParser(in); - JobInfo jobInfo = parser.parse(); + parser = new JobHistoryParser(in); + jobInfo = parser.parse(); + } Exception parseException = parser.getParseException(); Assert.assertNull("Caught an expected exception " + parseException, parseException); Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Fri Dec 20 01:01:18 2013 @@ -304,7 +304,7 @@ public class TestClientRedirect { @Override public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws IOException { - return recordFactory.newRecordInstance(KillApplicationResponse.class); + return KillApplicationResponse.newInstance(true); } @Override Modified: hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1552467&r1=1552466&r2=1552467&view=diff ============================================================================== --- hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original) +++ hadoop/common/branches/HDFS-4685/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Fri Dec 20 01:01:18 2013 @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -208,7 +209,7 @@ public class TestYARNRunner extends Test }; /* make sure kill calls finish application master */ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) - .thenReturn(null); + .thenReturn(KillApplicationResponse.newInstance(true)); delegate.killApplication(appId); verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));