Author: cdouglas Date: Tue Dec 17 22:54:31 2013 New Revision: 1551748 URL: http://svn.apache.org/r1551748 Log: MAPREDUCE-5189. Add policies and wiring to respond to preemption requests from YARN. Contributed by Carlo Curino.
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (with props) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (with props) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (with props) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java (with props) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (with props) Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Dec 17 22:54:31 2013 @@ -74,6 +74,9 @@ Trunk (Unreleased) MAPREDUCE-5197. Add a service for checkpointing task state. (Carlo Curino via cdouglas) + MAPREDUCE-5189. Add policies and wiring to respond to preemption requests + from YARN. (Carlo Curino via cdouglas) + BUG FIXES MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant. Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Tue Dec 17 22:54:31 2013 @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl ext .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); private JobTokenSecretManager jobTokenSecretManager = null; + private AMPreemptionPolicy preemptionPolicy; public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, - RMHeartbeatHandler rmHeartbeatHandler) { + RMHeartbeatHandler rmHeartbeatHandler, + AMPreemptionPolicy preemptionPolicy) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; this.rmHeartbeatHandler = rmHeartbeatHandler; + this.preemptionPolicy = preemptionPolicy; } @Override Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Dec 17 22:54:31 2013 @@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -188,8 +190,8 @@ public class MRAppMaster extends Composi private ContainerLauncher containerLauncher; private EventHandler<CommitterEvent> committerEventHandler; private Speculator speculator; - private TaskAttemptListener taskAttemptListener; - private JobTokenSecretManager jobTokenSecretManager = + protected TaskAttemptListener taskAttemptListener; + protected JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; @@ -197,6 +199,7 @@ public class MRAppMaster extends Composi private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; private SpeculatorEventDispatcher speculatorEventDispatcher; + private AMPreemptionPolicy preemptionPolicy; private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init @@ -383,8 +386,12 @@ public class MRAppMaster extends Composi committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); + //policy handling preemption requests from RM + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + //service to handle requests to TaskUmbilicalProtocol - taskAttemptListener = createTaskAttemptListener(context); + taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); addIfService(taskAttemptListener); //service to log job history events @@ -475,6 +482,12 @@ public class MRAppMaster extends Composi return committer; } + protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MRJobConfig.MR_AM_PREEMPTION_POLICY, + NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf); + } + protected boolean keepJobFiles(JobConf conf) { return (conf.getKeepTaskFilesPattern() != null || conf .getKeepFailedTaskFiles()); @@ -692,10 +705,11 @@ public class MRAppMaster extends Composi } } - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener(AppContext context, + AMPreemptionPolicy preemptionPolicy) { TaskAttemptListener lis = new TaskAttemptListenerImpl(context, jobTokenSecretManager, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), preemptionPolicy); return lis; } @@ -805,7 +819,7 @@ public class MRAppMaster extends Composi , containerID); } else { this.containerAllocator = new RMContainerAllocator( - this.clientService, this.context); + this.clientService, this.context, preemptionPolicy); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Dec 17 22:54:31 2013 @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -147,13 +149,17 @@ public class RMContainerAllocator extend private long retryInterval; private long retrystartTime; + private final AMPreemptionPolicy preemptionPolicy; + BlockingQueue<ContainerAllocatorEvent> eventQueue = new LinkedBlockingQueue<ContainerAllocatorEvent>(); private ScheduleStats scheduleStats = new ScheduleStats(); - public RMContainerAllocator(ClientService clientService, AppContext context) { + public RMContainerAllocator(ClientService clientService, AppContext context, + AMPreemptionPolicy preemptionPolicy) { super(clientService, context); + this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); } @@ -361,11 +367,15 @@ public class RMContainerAllocator extend LOG.error("Could not deallocate container for task attemptId " + aId); } + preemptionPolicy.handleCompletedContainer(event.getAttemptID()); } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { ContainerFailedEvent fEv = (ContainerFailedEvent) event; String host = getHost(fEv.getContMgrAddress()); containerFailedOnHost(host); + // propagate failures to preemption policy to discard checkpoints for + // failed tasks + preemptionPolicy.handleFailedContainer(event.getAttemptID()); } } @@ -399,7 +409,7 @@ public class RMContainerAllocator extend } scheduledRequests.reduces.clear(); - //preempt for making space for atleast one map + //preempt for making space for at least one map int premeptionLimit = Math.max(mapResourceReqt, (int) (maxReducePreemptionLimit * memLimit)); @@ -409,7 +419,7 @@ public class RMContainerAllocator extend int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - LOG.info("Going to preempt " + toPreempt); + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); assignedRequests.preemptReduce(toPreempt); } } @@ -595,6 +605,14 @@ public class RMContainerAllocator extend } List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses(); + + // propagate preemption requests + final PreemptionMessage preemptReq = response.getPreemptionMessage(); + if (preemptReq != null) { + preemptionPolicy.preempt( + new PreemptionContext(assignedRequests), preemptReq); + } + if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; @@ -630,7 +648,9 @@ public class RMContainerAllocator extend String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); - } + + preemptionPolicy.handleCompletedContainer(attemptID); + } } return newContainers; } @@ -1232,4 +1252,27 @@ public class RMContainerAllocator extend " RackLocal:" + rackLocalAssigned); } } + + static class PreemptionContext extends AMPreemptionPolicy.Context { + final AssignedRequests reqs; + + PreemptionContext(AssignedRequests reqs) { + this.reqs = reqs; + } + @Override + public TaskAttemptId getTaskAttempt(ContainerId container) { + return reqs.get(container); + } + + @Override + public List<Container> getContainers(TaskType t){ + if(TaskType.REDUCE.equals(t)) + return new ArrayList<Container>(reqs.reduces.values()); + if(TaskType.MAP.equals(t)) + return new ArrayList<Container>(reqs.maps.values()); + return null; + } + + } + } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java?rev=1551748&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java Tue Dec 17 22:54:31 2013 @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import java.util.List; + +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; + +/** + * Policy encoding the {@link org.apache.hadoop.mapreduce.v2.app.MRAppMaster} + * response to preemption requests from the ResourceManager. + * @see org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator + */ +public interface AMPreemptionPolicy { + + public abstract class Context { + + /** + * @param container ID of container to preempt + * @return Task associated with the running container or <code>null</code> + * if no task is bound to that container. + */ + public abstract TaskAttemptId getTaskAttempt(ContainerId container); + + /** + * Method provides the complete list of containers running task of type t + * for this AM. + * @param t the type of containers + * @return a map containing + */ + public abstract List<Container> getContainers(TaskType t); + + } + + public void init(AppContext context); + + /** + * Callback informing the policy of ResourceManager. requests for resources + * to return to the cluster. The policy may take arbitrary action to satisfy + * requests by checkpointing task state, returning containers, or ignoring + * requests. The RM may elect to enforce these requests by forcibly killing + * containers not returned after some duration. + * @param context Handle to the current state of running containers + * @param preemptionRequests Request from RM for resources to return. + */ + public void preempt(Context context, PreemptionMessage preemptionRequests); + + /** + * This method is invoked by components interested to learn whether a certain + * task is being preempted. + * @param attemptID Task attempt to query + * @return true if this attempt is being preempted + */ + public boolean isPreempted(TaskAttemptId attemptID); + + /** + * This method is used to report to the policy that a certain task has been + * successfully preempted (for bookeeping, counters, etc..) + * @param attemptID Task attempt that preempted + */ + public void reportSuccessfulPreemption(TaskAttemptID attemptID); + + /** + * Callback informing the policy of containers exiting with a failure. This + * allows the policy to implemnt cleanup/compensating actions. + * @param attemptID Task attempt that failed + */ + public void handleFailedContainer(TaskAttemptId attemptID); + + /** + * Callback informing the policy of containers exiting cleanly. This is + * reported to the policy for bookeeping purposes. + * @param attemptID Task attempt that completed + */ + public void handleCompletedContainer(TaskAttemptId attemptID); + + /** + * Method to retrieve the latest checkpoint for a given {@link TaskID} + * @param taskId TaskID + * @return CheckpointID associated with this task or null + */ + public TaskCheckpointID getCheckpointID(TaskID taskId); + + /** + * Method to store the latest {@link + * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link + * TaskID}. Assigning a null is akin to remove all previous checkpoints for + * this task. + * @param taskId TaskID + * @param cid Checkpoint to assign or <tt>null</tt> to remove it. + */ + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid); + +} Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java?rev=1551748&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java Tue Dec 17 22:54:31 2013 @@ -0,0 +1,111 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.event.EventHandler; + +/** + * Sample policy that aggressively kills tasks when requested. + */ +public class KillAMPreemptionPolicy implements AMPreemptionPolicy { + + private static final Log LOG = + LogFactory.getLog(KillAMPreemptionPolicy.class); + + @SuppressWarnings("rawtypes") + private EventHandler dispatcher = null; + + @Override + public void init(AppContext context) { + dispatcher = context.getEventHandler(); + } + + @Override + public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { + // for both strict and negotiable preemption requests kill the + // container + for (PreemptionContainer c : + preemptionRequests.getStrictContract().getContainers()) { + killContainer(ctxt, c); + } + for (PreemptionContainer c : + preemptionRequests.getContract().getContainers()) { + killContainer(ctxt, c); + } + } + + @SuppressWarnings("unchecked") + private void killContainer(Context ctxt, PreemptionContainer c){ + ContainerId reqCont = c.getId(); + TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont); + LOG.info("Evicting " + reqTask); + dispatcher.handle(new TaskAttemptEvent(reqTask, + TaskAttemptEventType.TA_KILL)); + + // add preemption to counters + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(reqTask + .getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1); + dispatcher.handle(jce); + } + + @Override + public void handleFailedContainer(TaskAttemptId attemptID) { + // ignore + } + + @Override + public boolean isPreempted(TaskAttemptId yarnAttemptID) { + return false; + } + + @Override + public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + return null; + } + + @Override + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + // ignore + } + + @Override + public void handleCompletedContainer(TaskAttemptId attemptID) { + // ignore + } + +} Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java?rev=1551748&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java Tue Dec 17 22:54:31 2013 @@ -0,0 +1,72 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; + +/** + * NoOp policy that ignores all the requests for preemption. + */ +public class NoopAMPreemptionPolicy implements AMPreemptionPolicy { + + @Override + public void init(AppContext context){ + // do nothing + } + + @Override + public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { + // do nothing, ignore all requeusts + } + + @Override + public void handleFailedContainer(TaskAttemptId attemptID) { + // do nothing + } + + @Override + public boolean isPreempted(TaskAttemptId yarnAttemptID) { + return false; + } + + @Override + public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + return null; + } + + @Override + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + // ignore + } + + @Override + public void handleCompletedContainer(TaskAttemptId attemptID) { + // ignore + } + +} Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Tue Dec 17 22:54:31 2013 @@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler); + super(context, jobTokenSecretManager, rmHeartbeatHandler, null); this.taskHeartbeatHandler = hbHandler; } @@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Dec 17 22:54:31 2013 @@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; @@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster { } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { return new TaskAttemptListener(){ @Override public InetSocketAddress getAddress() { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Tue Dec 17 22:54:31 2013 @@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -61,6 +63,8 @@ public class MRAppBenchmark { /** * Runs memory and time benchmark with Mock MRApp. + * @param app Application to submit + * @throws Exception On application failure */ public void run(MRApp app) throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -133,6 +137,7 @@ public class MRAppBenchmark { protected void serviceStart() throws Exception { thread = new Thread(new Runnable() { @Override + @SuppressWarnings("unchecked") public void run() { ContainerAllocatorEvent event = null; while (!Thread.currentThread().isInterrupted()) { @@ -192,7 +197,9 @@ public class MRAppBenchmark { @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { - return new RMContainerAllocator(clientService, context) { + + AMPreemptionPolicy policy = new NoopAMPreemptionPolicy(); + return new RMContainerAllocator(clientService, context, policy) { @Override protected ApplicationMasterProtocol createSchedulerProxy() { return new ApplicationMasterProtocol() { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Tue Dec 17 22:54:31 2013 @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -247,13 +248,14 @@ public class TestFail { super(maps, reduces, false, "TimeOutTaskMRApp", true); } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { //This will create the TaskAttemptListener with TaskHeartbeatHandler //RPC servers are not started //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null, null) { + return new TaskAttemptListenerImpl(getContext(), null, null, policy) { @Override public void startRpcServer(){}; @Override Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Dec 17 22:54:31 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; + import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; @@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator { // Use this constructor when using a real job. MyContainerAllocator(MyResourceManager rm, ApplicationAttemptId appAttemptId, AppContext context) { - super(createMockClientService(), context); + super(createMockClientService(), context, new NoopAMPreemptionPolicy()); this.rm = rm; } // Use this constructor when you are using a mocked job. public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { - super(createMockClientService(), createAppContext(appAttemptId, job)); + super(createMockClientService(), createAppContext(appAttemptId, job), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator { public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job, Clock clock) { super(createMockClientService(), - createAppContext(appAttemptId, job, clock)); + createAppContext(appAttemptId, job, clock), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator { ApplicationId.newInstance(1, 1)); RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), appContext) { + mock(ClientService.class), appContext, + new NoopAMPreemptionPolicy()) { @Override protected void register() { } @@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator { @Test public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), mock(AppContext.class)); + mock(ClientService.class), mock(AppContext.class), + new NoopAMPreemptionPolicy()); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java Tue Dec 17 22:54:31 2013 @@ -45,5 +45,9 @@ public enum JobCounter { TOTAL_LAUNCHED_UBERTASKS, NUM_UBER_SUBMAPS, NUM_UBER_SUBREDUCES, - NUM_FAILED_UBERTASKS + NUM_FAILED_UBERTASKS, + TASKS_REQ_PREEMPT, + CHECKPOINTS, + CHECKPOINT_BYTES, + CHECKPOINT_TIME } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Dec 17 22:54:31 2013 @@ -459,7 +459,13 @@ public interface MRJobConfig { public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = MR_AM_PREFIX + "job.reduce.preemption.limit"; public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f; - + + /** + * Policy class encoding responses to preemption requests. + */ + public static final String MR_AM_PREEMPTION_POLICY = + MR_AM_PREFIX + "preemption.policy"; + /** AM ACL disabled. **/ public static final String JOB_AM_ACCESS_DISABLED = "mapreduce.job.am-access-disabled"; @@ -708,4 +714,7 @@ public interface MRJobConfig { public static final String MR_APPLICATION_TYPE = "MAPREDUCE"; + public static final String TASK_PREEMPTION = + "mapreduce.job.preemption"; + } Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java?rev=1551748&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java Tue Dec 17 22:54:31 2013 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +public enum EnumCounter { + INPUTKEY, + INPUTVALUE, + OUTPUTRECORDS, + CHECKPOINT_BYTES, + CHECKPOINT_MS +} Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java ------------------------------------------------------------------------------ svn:eol-style = native Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java?rev=1551748&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java Tue Dec 17 22:54:31 2013 @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.Counters; + +/** + * Implementation of CheckpointID used in MR. It contains a reference to an + * underlying FileSsytem based checkpoint, and various metadata about the + * cost of checkpoints and other counters. This is sent by the task to the AM + * to be stored and provided to the next execution of the same task. + */ +public class TaskCheckpointID implements CheckpointID{ + + FSCheckpointID rawId; + private List<Path> partialOutput; + private Counters counters; + + public TaskCheckpointID() { + this.rawId = new FSCheckpointID(); + this.partialOutput = new ArrayList<Path>(); + } + + public TaskCheckpointID(FSCheckpointID rawId, List<Path> partialOutput, + Counters counters) { + this.rawId = rawId; + this.counters = counters; + if(partialOutput == null) + this.partialOutput = new ArrayList<Path>(); + else + this.partialOutput = partialOutput; + } + + @Override + public void write(DataOutput out) throws IOException { + counters.write(out); + if (partialOutput == null) { + WritableUtils.writeVLong(out, 0L); + } else { + WritableUtils.writeVLong(out, partialOutput.size()); + for(Path p:partialOutput){ + Text.writeString(out, p.toString()); + } + } + rawId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + partialOutput.clear(); + counters.readFields(in); + long numPout = WritableUtils.readVLong(in); + for(int i=0;i<numPout;i++) + partialOutput.add(new Path(Text.readString(in))); + rawId.readFields(in); + } + + @Override + public boolean equals(Object other){ + if (other instanceof TaskCheckpointID){ + return this.rawId.equals(((TaskCheckpointID)other).rawId) && + this.counters.equals(((TaskCheckpointID) other).counters) && + this.partialOutput.containsAll(((TaskCheckpointID) other).partialOutput) && + ((TaskCheckpointID) other).partialOutput.containsAll(this.partialOutput); + } else { + return false; + } + } + + @Override + public int hashCode() { + return rawId.hashCode(); + } + + /** + * @return the size of the checkpoint in bytes + */ + public long getCheckpointBytes() { + return counters.findCounter(EnumCounter.CHECKPOINT_BYTES).getValue(); + } + + /** + * @return how long it took to take this checkpoint + */ + public long getCheckpointTime() { + return counters.findCounter(EnumCounter.CHECKPOINT_MS).getValue(); + } + + public String toString(){ + return rawId.toString() + " counters:" + counters; + + } + + public List<Path> getPartialCommittedOutput() { + return partialOutput; + } + + public Counters getCounters() { + return counters; + } + +} Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties?rev=1551748&r1=1551747&r2=1551748&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties Tue Dec 17 22:54:31 2013 @@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name= Total SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms) FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) +TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt +CHECKPOINTS.name= Number of checkpoints reported +CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints +CHECKPOINT_TIME.name= Total time spent checkpointing (ms) \ No newline at end of file