Repository: hadoop Updated Branches: refs/heads/MR-6749 48a24ec9e -> 6507db0bc
MAPREDUCE-6785. ContainerLauncherImpl support for reusing the containers. Contributed by Naganarasimha G R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6507db0b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6507db0b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6507db0b Branch: refs/heads/MR-6749 Commit: 6507db0bcd9bf08e20103305f0b3acf302436fa9 Parents: 48a24ec Author: Devaraj K <deva...@apache.org> Authored: Tue Apr 4 15:48:35 2017 -0700 Committer: Devaraj K <deva...@apache.org> Committed: Tue Apr 4 15:48:35 2017 -0700 ---------------------------------------------------------------------- .../TaskAttemptContainerAssignedEvent.java | 9 ++ .../v2/app/job/impl/TaskAttemptImpl.java | 98 +++++++++++--------- .../mapreduce/v2/app/rm/ContainerRequestor.java | 6 ++ .../v2/app/rm/RMContainerAllocator.java | 10 +- .../v2/app/rm/RMContainerRequestor.java | 30 +++++- .../v2/app/rm/RMContainerReuseRequestor.java | 62 ++++++++++--- .../app/rm/TestRMContainerReuseRequestor.java | 30 ++++-- 7 files changed, 169 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java index 0f69fa8..41dfb03 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java @@ -28,6 +28,7 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent { private final Container container; private final Map<ApplicationAccessType, String> applicationACLs; + private int shufflePort = -1; public TaskAttemptContainerAssignedEvent(TaskAttemptId id, Container container, Map<ApplicationAccessType, String> applicationACLs) { @@ -36,6 +37,14 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent { this.applicationACLs = applicationACLs; } + public int getShufflePort() { + return shufflePort; + } + + public void setShufflePort(int shufflePort) { + this.shufflePort = shufflePort; + } + public Container getContainer() { return this.container; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 4305824..6d3caae 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -250,7 +250,8 @@ public abstract class TaskAttemptImpl implements // Transitions from the UNASSIGNED state. .addTransition(TaskAttemptStateInternal.UNASSIGNED, - TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED, + EnumSet.of(TaskAttemptStateInternal.ASSIGNED, + TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED, new ContainerAssignedTransition()) .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition( @@ -1676,13 +1677,14 @@ public abstract class TaskAttemptImpl implements } private static class ContainerAssignedTransition implements - SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, + TaskAttemptStateInternal> { @SuppressWarnings({ "unchecked" }) @Override - public void transition(final TaskAttemptImpl taskAttempt, - TaskAttemptEvent event) { - final TaskAttemptContainerAssignedEvent cEvent = - (TaskAttemptContainerAssignedEvent) event; + public TaskAttemptStateInternal transition( + final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + final TaskAttemptContainerAssignedEvent cEvent = + (TaskAttemptContainerAssignedEvent) event; Container container = cEvent.getContainer(); taskAttempt.container = container; // this is a _real_ Task (classic Hadoop mapred flavor): @@ -1695,20 +1697,26 @@ public abstract class TaskAttemptImpl implements taskAttempt.remoteTask, taskAttempt.jvmID); taskAttempt.computeRackAndLocality(); - - //launch the container - //create the container object to be launched for a given Task attempt - ContainerLaunchContext launchContext = createContainerLaunchContext( - cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, - taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, - taskAttempt.taskAttemptListener, taskAttempt.credentials); - taskAttempt.eventHandler - .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, - launchContext, container, taskAttempt.remoteTask)); - // send event to speculator that our container needs are satisfied - taskAttempt.eventHandler.handle - (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); + if (cEvent.getShufflePort() == -1) { + // launch the container + // create the container object to be launched for a given Task attempt + ContainerLaunchContext launchContext = createContainerLaunchContext( + cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken, + taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID, + taskAttempt.taskAttemptListener, taskAttempt.credentials); + taskAttempt.eventHandler + .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId, + launchContext, container, taskAttempt.remoteTask)); + + // send event to speculator that our container needs are satisfied + taskAttempt.eventHandler + .handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1)); + return TaskAttemptStateInternal.ASSIGNED; + } else { + taskAttempt.onContainerLaunch(cEvent.getShufflePort()); + return TaskAttemptStateInternal.RUNNING; + } } } @@ -1779,7 +1787,6 @@ public abstract class TaskAttemptImpl implements private static class LaunchedContainerTransition implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { - @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent evnt) { @@ -1787,33 +1794,34 @@ public abstract class TaskAttemptImpl implements TaskAttemptContainerLaunchedEvent event = (TaskAttemptContainerLaunchedEvent) evnt; - //set the launch time - taskAttempt.launchTime = taskAttempt.clock.getTime(); - taskAttempt.shufflePort = event.getShufflePort(); - - // register it to TaskAttemptListener so that it can start monitoring it. - taskAttempt.taskAttemptListener - .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID); - //TODO Resolve to host / IP in case of a local address. - InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? - NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); - taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); - taskAttempt.httpPort = nodeHttpInetAddr.getPort(); - taskAttempt.sendLaunchedEvents(); - taskAttempt.eventHandler.handle - (new SpeculatorEvent - (taskAttempt.attemptId, true, taskAttempt.clock.getTime())); - //make remoteTask reference as null as it is no more needed - //and free up the memory - taskAttempt.remoteTask = null; - - //tell the Task that attempt has started - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_LAUNCHED)); + taskAttempt.onContainerLaunch(event.getShufflePort()); } } - + + @SuppressWarnings("unchecked") + private void onContainerLaunch(int shufflePortParam) { + // set the launch time + launchTime = clock.getTime(); + this.shufflePort = shufflePortParam; + + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttemptListener.registerLaunchedTask(attemptId, jvmID); + // TODO Resolve to host / IP in case of a local address. + InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr? + NetUtils.createSocketAddr(container.getNodeHttpAddress()); + trackerName = nodeHttpInetAddr.getHostName(); + httpPort = nodeHttpInetAddr.getPort(); + sendLaunchedEvents(); + eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime())); + // make remoteTask reference as null as it is no more needed + // and free up the memory + remoteTask = null; + + // tell the Task that attempt has started + eventHandler.handle( + new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); + } + private static class CommitPendingTransition implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java index 2d54633..9842e0d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestor.java @@ -18,9 +18,12 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -38,6 +41,9 @@ public interface ContainerRequestor { void decContainerReq(ContainerRequest request); + void containerAssigned(Container allocated, ContainerRequest assigned, + Map<ApplicationAccessType, String> acls); + void release(ContainerId containerId); boolean isNodeBlacklisted(String hostname); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index f374b15..be7ad16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -55,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; @@ -255,7 +254,7 @@ public class RMContainerAllocator extends RMCommunicator dispatcher.register(RMContainerReuseRequestor.EventType.class, (RMContainerReuseRequestor) containerRequestor); } else { - containerRequestor = new RMContainerRequestor(this); + containerRequestor = new RMContainerRequestor(eventHandler, this); } containerRequestor.init(conf); } @@ -1244,11 +1243,8 @@ public class RMContainerAllocator extends RMCommunicator private void containerAssigned(Container allocated, ContainerRequest assigned) { // Update resource requests - containerRequestor.decContainerReq(assigned); - - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.attemptID, allocated, applicationACLs)); + containerRequestor.containerAssigned(allocated, assigned, + applicationACLs); assignedRequests.add(allocated, assigned.attemptID); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 074ead4..c0c69c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -36,25 +36,29 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -113,11 +117,16 @@ public class RMContainerRequestor extends AbstractService .newSetFromMap(new ConcurrentHashMap<String, Boolean>()); private final ApplicationId applicationId; private final RMCommunicator rmCommunicator; + @SuppressWarnings("rawtypes") + private EventHandler eventHandler; - public RMContainerRequestor(RMCommunicator rmCommunicator) { + @SuppressWarnings("rawtypes") + public RMContainerRequestor(EventHandler eventHandler, + RMCommunicator rmCommunicator) { super(RMContainerRequestor.class.getName()); this.rmCommunicator = rmCommunicator; applicationId = rmCommunicator.applicationId; + this.eventHandler = eventHandler; } @Private @@ -423,17 +432,28 @@ public class RMContainerRequestor extends AbstractService req.nodeLabelExpression); } + @SuppressWarnings("unchecked") + @Override + public void containerAssigned(Container allocated, ContainerRequest req, + Map<ApplicationAccessType, String> applicationACLs) { + decContainerReq(req); + + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + req.attemptID, allocated, applicationACLs)); + } + @Override public void decContainerReq(ContainerRequest req) { // Update resource requests for (String hostName : req.hosts) { decResourceRequest(req.priority, hostName, req.capability); } - + for (String rack : req.racks) { decResourceRequest(req.priority, rack, req.capability); } - + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java index 2d78ab0..8bd75af 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerReuseRequestor.java @@ -34,7 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; @@ -53,8 +55,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor private static final Log LOG = LogFactory .getLog(RMContainerReuseRequestor.class); - private Map<Container, String> containersToReuse = - new ConcurrentHashMap<Container, String>(); + private Map<Container, HostInfo> containersToReuse = + new ConcurrentHashMap<>(); private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap = new HashMap<ContainerId, List<TaskAttemptId>>(); private int containerReuseMaxMapTasks; @@ -63,14 +65,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor private int maxReduceTaskContainers; private int noOfMapTaskContainersForReuse; private int noOfReduceTaskContainersForReuse; + private final RMCommunicator rmCommunicator; + @SuppressWarnings("rawtypes") + private final EventHandler eventHandler; - private RMCommunicator rmCommunicator; - + @SuppressWarnings("rawtypes") public RMContainerReuseRequestor( - EventHandler<ContainerAvailableEvent> eventHandler, + EventHandler eventHandler, RMCommunicator rmCommunicator) { - super(rmCommunicator); + super(eventHandler, rmCommunicator); this.rmCommunicator = rmCommunicator; + this.eventHandler = eventHandler; } @Override @@ -113,8 +118,8 @@ public class RMContainerReuseRequestor extends RMContainerRequestor boolean blacklisted = super.isNodeBlacklisted(hostName); if (blacklisted) { Set<Container> containersOnHost = new HashSet<Container>(); - for (Entry<Container, String> elem : containersToReuse.entrySet()) { - if (elem.getValue().equals(hostName)) { + for (Entry<Container, HostInfo> elem : containersToReuse.entrySet()) { + if (elem.getValue().getHost().equals(hostName)) { containersOnHost.add(elem.getKey()); } } @@ -139,6 +144,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor containerTaskAttempts = new ArrayList<TaskAttemptId>(); containerToTaskAttemptsMap.put(containerId, containerTaskAttempts); } + TaskAttemptId taskAttemptId = event.getTaskAttemptId(); if (checkMapContainerReuseConstraints(priority, containerTaskAttempts) || checkReduceContainerReuseConstraints(priority, containerTaskAttempts)) { @@ -147,13 +153,17 @@ public class RMContainerReuseRequestor extends RMContainerRequestor // If there are any eligible requests if (resourceRequests != null && !resourceRequests.isEmpty()) { canReuse = true; - containerTaskAttempts.add(event.getTaskAttemptId()); + containerTaskAttempts.add(taskAttemptId); } } ((RMContainerAllocator) rmCommunicator) .resetContainerForReuse(container.getId()); if (canReuse) { - containersToReuse.put(container, resourceName); + int shufflePort = + rmCommunicator.getJob().getTask(taskAttemptId.getTaskId()) + .getAttempt(taskAttemptId).getShufflePort(); + containersToReuse.put(container, + new HostInfo(resourceName, shufflePort)); incrementRunningReuseContainers(priority); LOG.info("Adding the " + containerId + " for reuse."); } else { @@ -211,7 +221,7 @@ public class RMContainerReuseRequestor extends RMContainerRequestor @Private @VisibleForTesting - Map<Container, String> getContainersToReuse() { + Map<Container, HostInfo> getContainersToReuse() { return containersToReuse; } @@ -221,4 +231,34 @@ public class RMContainerReuseRequestor extends RMContainerRequestor public static enum EventType { CONTAINER_AVAILABLE } + + @SuppressWarnings("unchecked") + @Override + public void containerAssigned(Container allocated, ContainerRequest req, + Map<ApplicationAccessType, String> applicationACLs) { + if(containersToReuse.containsKey(allocated)){ + decContainerReq(req); + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + req.attemptID, allocated, applicationACLs)); + } else { + super.containerAssigned(allocated, req, applicationACLs); + } + } + + static class HostInfo { + private String host; + private int port; + public HostInfo(String host, int port) { + super(); + this.host = host; + this.port = port; + } + public String getHost() { + return host; + } + public int getPort() { + return port; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6507db0b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java index d747e74..2ca7cc8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerReuseRequestor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.Map; @@ -29,8 +31,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -51,8 +57,16 @@ public class TestRMContainerReuseRequestor { @Before public void setup() throws IOException { + RMContainerAllocator allocator = mock(RMContainerAllocator.class); + Job job = mock(Job.class); + Task task = mock(Task.class); + TaskAttempt taskAttempt = mock(TaskAttempt.class); + when(taskAttempt.getShufflePort()).thenReturn(0); + when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt); + when(job.getTask(any(TaskId.class))).thenReturn(task); + when(allocator.getJob()).thenReturn(job); reuseRequestor = new RMContainerReuseRequestor(null, - mock(RMContainerAllocator.class)); + allocator); } @Test @@ -138,14 +152,14 @@ public class TestRMContainerReuseRequestor { @Test public void testContainerFailedOnHost() throws Exception { reuseRequestor.serviceInit(new Configuration()); - Map<Container, String> containersToReuse = reuseRequestor + Map<Container, HostInfo> containersToReuse = reuseRequestor .getContainersToReuse(); containersToReuse .put(newContainerInstance("container_1472171035081_0009_01_000008", - RMContainerAllocator.PRIORITY_REDUCE), "node1"); + RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999)); containersToReuse .put(newContainerInstance("container_1472171035081_0009_01_000009", - RMContainerAllocator.PRIORITY_REDUCE), "node2"); + RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999)); reuseRequestor.getBlacklistedNodes().add("node1"); // It removes all containers from containersToReuse running in node1 reuseRequestor.containerFailedOnHost("node1"); @@ -172,7 +186,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event = new ContainerAvailableEvent( EventType.CONTAINER_AVAILABLE, taskAttemptId, container); reuseRequestor.handle(event); - Map<Container, String> containersToReuse = reuseRequestor + Map<Container, HostInfo> containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container)); @@ -206,7 +220,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, taskAttemptId1, container); reuseRequestor.handle(event1); - Map<Container, String> containersToReuse = reuseRequestor + Map<Container, HostInfo> containersToReuse = reuseRequestor .getContainersToReuse(); // It is reusing the container Assert.assertTrue("Container should be added for reuse.", @@ -236,7 +250,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent( EventType.CONTAINER_AVAILABLE, taskAttemptId1, container); reuseRequestor.handle(event1); - Map<Container, String> containersToReuse = reuseRequestor + Map<Container, HostInfo> containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container)); @@ -269,7 +283,7 @@ public class TestRMContainerReuseRequestor { ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType, taskAttemptId1, container1); reuseRequestor.handle(event1); - Map<Container, String> containersToReuse = reuseRequestor + Map<Container, HostInfo> containersToReuse = reuseRequestor .getContainersToReuse(); Assert.assertTrue("Container should be added for reuse.", containersToReuse.containsKey(container1)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org