Author: vinodkv Date: Wed Jul 24 03:42:52 2013 New Revision: 1506392 URL: http://svn.apache.org/r1506392 Log: YARN-926. Modified ContainerManagerProtcol APIs to take in requests for multiple containers. Contributed by Jian He. MAPREDUCE-5412. Update MR app to use multiple containers API of ContainerManager after YARN-926. Contributed by Jian He. svn merge --ignore-ancestry -c 1506391 ../../trunk/
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1506392&r1=1506391&r2=1506392&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Jul 24 03:42:52 2013 @@ -450,6 +450,9 @@ Release 2.1.0-beta - 2013-07-02 MAPREDUCE-5325. MR changes related to YARN-727. ClientRMProtocol.getAllApplications should accept ApplicationType as a parameter. (Xuan Gong via hitesh) + MAPREDUCE-5412. Update MR app to use multiple containers API of + ContainerManager after YARN-926. (Jian He via vinodkv) + BREAKDOWN OF HADOOP-8562 SUBTASKS MAPREDUCE-4739. Some MapReduce tests fail to find winutils. Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed Jul 24 03:42:52 2013 @@ -20,7 +20,9 @@ package org.apache.hadoop.mapreduce.v2.a import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -44,14 +46,15 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.util.Records; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -139,13 +142,18 @@ public class ContainerLauncherImpl exten event.getContainerLaunchContext(); // Now launch the actual container - StartContainerRequest startRequest = Records - .newRecord(StartContainerRequest.class); - startRequest.setContainerLaunchContext(containerLaunchContext); - startRequest.setContainerToken(event.getContainerToken()); - StartContainerResponse response = - proxy.getContainerManagementProtocol().startContainer(startRequest); - + StartContainerRequest startRequest = + StartContainerRequest.newInstance(containerLaunchContext, + event.getContainerToken()); + List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); + list.add(startRequest); + StartContainersRequest requestList = StartContainersRequest.newInstance(list); + StartContainersResponse response = + proxy.getContainerManagementProtocol().startContainers(requestList); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(containerID)) { + throw response.getFailedRequests().get(containerID).deSerialize(); + } ByteBuffer portInfo = response.getAllServicesMetaData().get( ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID); @@ -192,13 +200,17 @@ public class ContainerLauncherImpl exten proxy = getCMProxy(this.containerMgrAddress, this.containerID); // kill the remote container if already launched - StopContainerRequest stopRequest = Records - .newRecord(StopContainerRequest.class); - stopRequest.setContainerId(this.containerID); - proxy.getContainerManagementProtocol().stopContainer(stopRequest); - + List<ContainerId> ids = new ArrayList<ContainerId>(); + ids.add(this.containerID); + StopContainersRequest request = StopContainersRequest.newInstance(ids); + StopContainersResponse response = + proxy.getContainerManagementProtocol().stopContainers(request); + if (response.getFailedRequests() != null + && response.getFailedRequests().containsKey(this.containerID)) { + throw response.getFailedRequests().get(this.containerID) + .deSerialize(); + } } catch (Throwable t) { - // ignore the cleanup failure String message = "cleanup failed for container " + this.containerID + " : " Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1506392&r1=1506391&r2=1506392&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Wed Jul 24 03:42:52 2013 @@ -24,6 +24,8 @@ import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; @@ -52,12 +54,13 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -393,18 +396,18 @@ public class TestContainerLauncher { private ContainerStatus status = null; @Override - public GetContainerStatusResponse getContainerStatus( - GetContainerStatusRequest request) throws IOException { - GetContainerStatusResponse response = recordFactory - .newRecordInstance(GetContainerStatusResponse.class); - response.setStatus(status); - return response; + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws IOException { + List<ContainerStatus> statuses = new ArrayList<ContainerStatus>(); + statuses.add(status); + return GetContainerStatusesResponse.newInstance(statuses, null); } @Override - public StartContainerResponse startContainer(StartContainerRequest request) + public StartContainersResponse startContainers(StartContainersRequest requests) throws IOException { + StartContainerRequest request = requests.getStartContainerRequests().get(0); ContainerTokenIdentifier containerTokenIdentifier = MRApp.newContainerTokenIdentifier(request.getContainerToken()); @@ -412,8 +415,8 @@ public class TestContainerLauncher { Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT, containerTokenIdentifier.getNmHostAddress()); - StartContainerResponse response = recordFactory - .newRecordInstance(StartContainerResponse.class); + StartContainersResponse response = recordFactory + .newRecordInstance(StartContainersResponse.class); status = recordFactory.newRecordInstance(ContainerStatus.class); try { // make the thread sleep to look like its not going to respond @@ -429,7 +432,7 @@ public class TestContainerLauncher { } @Override - public StopContainerResponse stopContainer(StopContainerRequest request) + public StopContainersResponse stopContainers(StopContainersRequest request) throws IOException { Exception e = new Exception("Dummy function", new Exception( "Dummy function cause")); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1506392&r1=1506391&r2=1506392&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Wed Jul 24 03:42:52 2013 @@ -45,12 +45,12 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -162,8 +162,8 @@ public class TestContainerLauncherImpl { try { ContainerId contId = makeContainerId(0l, 0, 0, 1); TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); - StartContainerResponse startResp = - recordFactory.newRecordInstance(StartContainerResponse.class); + StartContainersResponse startResp = + recordFactory.newRecordInstance(StartContainersResponse.class); startResp.setAllServicesMetaData(serviceResponse); @@ -176,14 +176,14 @@ public class TestContainerLauncherImpl { .thenReturn(contId); when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); - when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp); when(mockLaunchEvent.getContainerToken()).thenReturn( createNewContainerToken(contId, cmAddress)); ut.handle(mockLaunchEvent); ut.waitForPoolToIdle(); - verify(mockCM).startContainer(any(StartContainerRequest.class)); + verify(mockCM).startContainers(any(StartContainersRequest.class)); LOG.info("inserting cleanup event"); ContainerLauncherEvent mockCleanupEvent = @@ -198,7 +198,7 @@ public class TestContainerLauncherImpl { ut.waitForPoolToIdle(); - verify(mockCM).stopContainer(any(StopContainerRequest.class)); + verify(mockCM).stopContainers(any(StopContainersRequest.class)); } finally { ut.stop(); } @@ -224,8 +224,8 @@ public class TestContainerLauncherImpl { ContainerId contId = makeContainerId(0l, 0, 0, 1); TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); String cmAddress = "127.0.0.1:8000"; - StartContainerResponse startResp = - recordFactory.newRecordInstance(StartContainerResponse.class); + StartContainersResponse startResp = + recordFactory.newRecordInstance(StartContainersResponse.class); startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting cleanup event"); @@ -241,7 +241,7 @@ public class TestContainerLauncherImpl { ut.waitForPoolToIdle(); - verify(mockCM, never()).stopContainer(any(StopContainerRequest.class)); + verify(mockCM, never()).stopContainers(any(StopContainersRequest.class)); LOG.info("inserting launch event"); ContainerRemoteLaunchEvent mockLaunchEvent = @@ -252,14 +252,14 @@ public class TestContainerLauncherImpl { .thenReturn(contId); when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); - when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp); when(mockLaunchEvent.getContainerToken()).thenReturn( createNewContainerToken(contId, cmAddress)); ut.handle(mockLaunchEvent); ut.waitForPoolToIdle(); - verify(mockCM, never()).startContainer(any(StartContainerRequest.class)); + verify(mockCM, never()).startContainers(any(StartContainersRequest.class)); } finally { ut.stop(); } @@ -286,8 +286,8 @@ public class TestContainerLauncherImpl { ContainerId contId = makeContainerId(0l, 0, 0, 1); TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); String cmAddress = "127.0.0.1:8000"; - StartContainerResponse startResp = - recordFactory.newRecordInstance(StartContainerResponse.class); + StartContainersResponse startResp = + recordFactory.newRecordInstance(StartContainersResponse.class); startResp.setAllServicesMetaData(serviceResponse); LOG.info("inserting launch event"); @@ -299,20 +299,20 @@ public class TestContainerLauncherImpl { .thenReturn(contId); when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId); when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress); - when(mockCM.startContainer(any(StartContainerRequest.class))).thenReturn(startResp); + when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp); when(mockLaunchEvent.getContainerToken()).thenReturn( createNewContainerToken(contId, cmAddress)); ut.handle(mockLaunchEvent); ut.waitForPoolToIdle(); - verify(mockCM).startContainer(any(StartContainerRequest.class)); + verify(mockCM).startContainers(any(StartContainersRequest.class)); // skip cleanup and make sure stop kills the container } finally { ut.stop(); - verify(mockCM).stopContainer(any(StopContainerRequest.class)); + verify(mockCM).stopContainers(any(StopContainersRequest.class)); } } @@ -341,8 +341,8 @@ public class TestContainerLauncherImpl { ContainerId contId = makeContainerId(0l, 0, 0, 1); TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0); String cmAddress = "127.0.0.1:8000"; - StartContainerResponse startResp = - recordFactory.newRecordInstance(StartContainerResponse.class); + StartContainersResponse startResp = + recordFactory.newRecordInstance(StartContainersResponse.class); startResp.setAllServicesMetaData(serviceResponse); @@ -415,7 +415,7 @@ public class TestContainerLauncherImpl { this.completeLaunchBarrier = completeLaunchBarrier; } @Override - public StartContainerResponse startContainer(StartContainerRequest request) + public StartContainersResponse startContainers(StartContainersRequest request) throws IOException { try { startLaunchBarrier.await(); @@ -433,16 +433,14 @@ public class TestContainerLauncherImpl { } @Override - public StopContainerResponse stopContainer(StopContainerRequest request) + public StopContainersResponse stopContainers(StopContainersRequest request) throws IOException { - return null; } @Override - public GetContainerStatusResponse getContainerStatus( - GetContainerStatusRequest request) throws IOException { - + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws IOException { return null; } }