Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Wed Apr 24 02:21:58 2013 @@ -42,6 +42,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -72,6 +73,7 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; @@ -491,7 +494,7 @@ public class TestResourceLocalizationSer Thread.sleep(1000); dispatcher.await(); String appStr = ConverterUtils.toString(appId); - String ctnrStr = c.getContainerID().toString(); + String ctnrStr = c.getContainer().getId().toString(); ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class); verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class), eq("user0"), eq(appStr), eq(ctnrStr), @@ -567,7 +570,7 @@ public class TestResourceLocalizationSer public boolean matches(Object o) { ContainerEvent evt = (ContainerEvent) o; return evt.getType() == ContainerEventType.RESOURCE_LOCALIZED - && c.getContainerID() == evt.getContainerID(); + && c.getContainer().getId() == evt.getContainerID(); } }; // total 2 resource localzation calls. one for each resource. @@ -756,11 +759,11 @@ public class TestResourceLocalizationSer // Container - 1 ContainerImpl container1 = createMockContainer(user, 1); - String localizerId1 = container1.getContainerID().toString(); + String localizerId1 = container1.getContainer().getId().toString(); rls.getPrivateLocalizers().put( localizerId1, rls.new LocalizerRunner(new LocalizerContext(user, container1 - .getContainerID(), null), localizerId1)); + .getContainer().getId(), null), localizerId1)); LocalizerRunner localizerRunner1 = rls.getLocalizerRunner(localizerId1); dispatcher1.getEventHandler().handle( @@ -771,11 +774,11 @@ public class TestResourceLocalizationSer // Container - 2 now makes the request. ContainerImpl container2 = createMockContainer(user, 2); - String localizerId2 = container2.getContainerID().toString(); + String localizerId2 = container2.getContainer().getId().toString(); rls.getPrivateLocalizers().put( localizerId2, rls.new LocalizerRunner(new LocalizerContext(user, container2 - .getContainerID(), null), localizerId2)); + .getContainer().getId(), null), localizerId2)); LocalizerRunner localizerRunner2 = rls.getLocalizerRunner(localizerId2); dispatcher1.getEventHandler().handle( createContainerLocalizationEvent(container2, @@ -848,6 +851,163 @@ public class TestResourceLocalizationSer } } + @Test(timeout = 10000) + @SuppressWarnings("unchecked") + public void testLocalResourcePath() throws Exception { + + // test the local path where application and user cache files will be + // localized. + + DrainDispatcher dispatcher1 = null; + try { + dispatcher1 = new DrainDispatcher(); + String user = "testuser"; + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + + // mocked Resource Localization Service + Configuration conf = new Configuration(); + AbstractFileSystem spylfs = + spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); + final FileContext lfs = FileContext.getFileContext(spylfs, conf); + // We don't want files to be created + doNothing().when(spylfs).mkdir(isA(Path.class), isA(FsPermission.class), + anyBoolean()); + + // creating one local directory + List<Path> localDirs = new ArrayList<Path>(); + String[] sDirs = new String[1]; + for (int i = 0; i < 1; ++i) { + localDirs.add(lfs.makeQualified(new Path(basedir, i + ""))); + sDirs[i] = localDirs.get(i).toString(); + } + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + // setting log directory. + String logDir = + lfs.makeQualified(new Path(basedir, "logdir ")).toString(); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); + + LocalDirsHandlerService localDirHandler = new LocalDirsHandlerService(); + localDirHandler.init(conf); + // Registering event handlers + EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class); + dispatcher1.register(ApplicationEventType.class, applicationBus); + EventHandler<ContainerEvent> containerBus = mock(EventHandler.class); + dispatcher1.register(ContainerEventType.class, containerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + DeletionService delService = mock(DeletionService.class); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + // initializing directory handler. + dirsHandler.init(conf); + + dispatcher1.init(conf); + dispatcher1.start(); + + ResourceLocalizationService rls = + new ResourceLocalizationService(dispatcher1, exec, delService, + localDirHandler); + dispatcher1.register(LocalizationEventType.class, rls); + rls.init(conf); + + rls.handle(createApplicationLocalizationEvent(user, appId)); + + // We need to pre-populate the LocalizerRunner as the + // Resource Localization Service code internally starts them which + // definitely we don't want. + + // creating new container and populating corresponding localizer runner + + // Container - 1 + Container container1 = createMockContainer(user, 1); + String localizerId1 = container1.getContainer().getId().toString(); + rls.getPrivateLocalizers().put( + localizerId1, + rls.new LocalizerRunner(new LocalizerContext(user, container1 + .getContainer().getId(), null), localizerId1)); + + // Creating two requests for container + // 1) Private resource + // 2) Application resource + LocalResourceRequest reqPriv = + new LocalResourceRequest(new Path("file:///tmp1"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.PRIVATE, ""); + List<LocalResourceRequest> privList = + new ArrayList<LocalResourceRequest>(); + privList.add(reqPriv); + + LocalResourceRequest reqApp = + new LocalResourceRequest(new Path("file:///tmp2"), 123L, + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, ""); + List<LocalResourceRequest> appList = + new ArrayList<LocalResourceRequest>(); + appList.add(reqApp); + + Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs = + new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>(); + rsrcs.put(LocalResourceVisibility.APPLICATION, appList); + rsrcs.put(LocalResourceVisibility.PRIVATE, privList); + + dispatcher1.getEventHandler().handle( + new ContainerLocalizationRequestEvent(container1, rsrcs)); + + // Now waiting for resource download to start. Here actual will not start + // Only the resources will be populated into pending list. + Assert + .assertTrue(waitForPrivateDownloadToStart(rls, localizerId1, 2, 500)); + + // Validating user and application cache paths + + String userCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.FILECACHE)); + String userAppCachePath = + StringUtils.join(Path.SEPARATOR, Arrays.asList(localDirs.get(0) + .toUri().getRawPath(), ContainerLocalizer.USERCACHE, user, + ContainerLocalizer.APPCACHE, appId.toString(), + ContainerLocalizer.FILECACHE)); + + // Now the Application and private resources may come in any order + // for download. + // For User cahce : + // returned destinationPath = user cache path + random number + // For App cache : + // returned destinationPath = user app cache path + random number + + int returnedResources = 0; + boolean appRsrc = false, privRsrc = false; + while (returnedResources < 2) { + LocalizerHeartbeatResponse response = + rls.heartbeat(createLocalizerStatus(localizerId1)); + for (ResourceLocalizationSpec resourceSpec : response + .getResourceSpecs()) { + returnedResources++; + Path destinationDirectory = + new Path(resourceSpec.getDestinationDirectory().getFile()); + if (resourceSpec.getResource().getVisibility() == + LocalResourceVisibility.APPLICATION) { + appRsrc = true; + Assert.assertEquals(userAppCachePath, destinationDirectory + .getParent().toUri().toString()); + } else if (resourceSpec.getResource().getVisibility() == + LocalResourceVisibility.PRIVATE) { + privRsrc = true; + Assert.assertEquals(userCachePath, destinationDirectory.getParent() + .toUri().toString()); + } else { + throw new Exception("Unexpected resource recevied."); + } + } + } + // We should receive both the resources (Application and Private) + Assert.assertTrue(appRsrc && privRsrc); + } finally { + if (dispatcher1 != null) { + dispatcher1.stop(); + } + } + } + private LocalizerStatus createLocalizerStatusForFailedResource( String localizerId, LocalResourceRequest req) { LocalizerStatus status = createLocalizerStatus(localizerId); @@ -1154,7 +1314,10 @@ public class TestResourceLocalizationSer private ContainerImpl createMockContainer(String user, int containerId) { ContainerImpl container = mock(ContainerImpl.class); - when(container.getContainerID()).thenReturn( + org.apache.hadoop.yarn.api.records.Container c = + mock(org.apache.hadoop.yarn.api.records.Container.class); + when(container.getContainer()).thenReturn(c); + when(container.getContainer().getId()).thenReturn( BuilderUtils.newContainerId(1, 1, 1, containerId)); when(container.getUser()).thenReturn(user); Credentials mockCredentials = mock(Credentials.class); @@ -1194,8 +1357,11 @@ public class TestResourceLocalizationSer ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1); ContainerId cId = BuilderUtils.newContainerId(appAttemptId, id); + org.apache.hadoop.yarn.api.records.Container containerAPI = + mock(org.apache.hadoop.yarn.api.records.Container.class); + when(c.getContainer()).thenReturn(containerAPI); when(c.getUser()).thenReturn("user0"); - when(c.getContainerID()).thenReturn(cId); + when(c.getContainer().getId()).thenReturn(cId); Credentials creds = new Credentials(); creds.addToken(new Text("tok" + id), getToken(id)); when(c.getCredentials()).thenReturn(creds);
Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Wed Apr 24 02:21:58 2013 @@ -213,6 +213,9 @@ public class TestContainersMonitor exten cId.setApplicationAttemptId(appAttemptId); when(mockContainer.getId()).thenReturn(cId); + when(mockContainer.getNodeId()).thenReturn(context.getNodeId()); + when(mockContainer.getNodeHttpAddress()).thenReturn( + context.getNodeId().getHost() + ":12345"); containerLaunchContext.setUser(user); URL resource_alpha = Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java Wed Apr 24 02:21:58 2013 @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.util.BuilderUtils; +import static org.mockito.Mockito.*; public class MockContainer implements Container { @@ -48,6 +49,7 @@ public class MockContainer implements Co private final Map<Path, List<String>> resource = new HashMap<Path, List<String>>(); private RecordFactory recordFactory; + private org.apache.hadoop.yarn.api.records.Container mockContainer; public MockContainer(ApplicationAttemptId appAttemptId, Dispatcher dispatcher, Configuration conf, String user, @@ -62,6 +64,8 @@ public class MockContainer implements Co launchContext.setUser(user); this.state = ContainerState.NEW; + mockContainer = mock(org.apache.hadoop.yarn.api.records.Container.class); + when(mockContainer.getId()).thenReturn(id); } public void setState(ContainerState state) { @@ -69,11 +73,6 @@ public class MockContainer implements Co } @Override - public ContainerId getContainerID() { - return id; - } - - @Override public String getUser() { return user; } @@ -119,8 +118,7 @@ public class MockContainer implements Co } @Override - public Resource getResource() { - return null; + public org.apache.hadoop.yarn.api.records.Container getContainer() { + return this.mockContainer; } - } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java Wed Apr 24 02:21:58 2013 @@ -185,16 +185,18 @@ public class TestNMWebServicesApps exten app.getUser(), app.getAppId(), 1); Container container2 = new MockContainer(appAttemptId, dispatcher, conf, app.getUser(), app.getAppId(), 2); - nmContext.getContainers().put(container1.getContainerID(), container1); - nmContext.getContainers().put(container2.getContainerID(), container2); + nmContext.getContainers() + .put(container1.getContainer().getId(), container1); + nmContext.getContainers() + .put(container2.getContainer().getId(), container2); - app.getContainers().put(container1.getContainerID(), container1); - app.getContainers().put(container2.getContainerID(), container2); + app.getContainers().put(container1.getContainer().getId(), container1); + app.getContainers().put(container2.getContainer().getId(), container2); HashMap<String, String> hash = new HashMap<String, String>(); - hash.put(container1.getContainerID().toString(), container1 - .getContainerID().toString()); - hash.put(container2.getContainerID().toString(), container2 - .getContainerID().toString()); + hash.put(container1.getContainer().getId().toString(), container1 + .getContainer().getId().toString()); + hash.put(container2.getContainer().getId().toString(), container2 + .getContainer().getId().toString()); return hash; } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java Wed Apr 24 02:21:58 2013 @@ -186,16 +186,18 @@ public class TestNMWebServicesContainers app.getUser(), app.getAppId(), 1); Container container2 = new MockContainer(appAttemptId, dispatcher, conf, app.getUser(), app.getAppId(), 2); - nmContext.getContainers().put(container1.getContainerID(), container1); - nmContext.getContainers().put(container2.getContainerID(), container2); + nmContext.getContainers() + .put(container1.getContainer().getId(), container1); + nmContext.getContainers() + .put(container2.getContainer().getId(), container2); - app.getContainers().put(container1.getContainerID(), container1); - app.getContainers().put(container2.getContainerID(), container2); + app.getContainers().put(container1.getContainer().getId(), container1); + app.getContainers().put(container2.getContainer().getId(), container2); HashMap<String, String> hash = new HashMap<String, String>(); - hash.put(container1.getContainerID().toString(), container1 - .getContainerID().toString()); - hash.put(container2.getContainerID().toString(), container2 - .getContainerID().toString()); + hash.put(container1.getContainer().getId().toString(), container1 + .getContainer().getId().toString()); + hash.put(container2.getContainer().getId().toString(), container2 + .getContainer().getId().toString()); return hash; } @@ -468,7 +470,7 @@ public class TestNMWebServicesContainers String state, String user, int exitCode, String diagnostics, String nodeId, int totalMemoryNeededMB, String logsLink) throws JSONException, Exception { - WebServicesTestUtils.checkStringMatch("id", cont.getContainerID() + WebServicesTestUtils.checkStringMatch("id", cont.getContainer().getId() .toString(), id); WebServicesTestUtils.checkStringMatch("state", cont.getContainerState() .toString(), state); @@ -481,8 +483,9 @@ public class TestNMWebServicesContainers WebServicesTestUtils.checkStringMatch("nodeId", nmContext.getNodeId() .toString(), nodeId); assertEquals("totalMemoryNeededMB wrong", 0, totalMemoryNeededMB); - String shortLink = ujoin("containerlogs", cont.getContainerID().toString(), - cont.getUser()); + String shortLink = + ujoin("containerlogs", cont.getContainer().getId().toString(), + cont.getUser()); assertTrue("containerLogsLink wrong", logsLink.contains(shortLink)); } Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Wed Apr 24 02:21:58 2013 @@ -178,17 +178,7 @@ public class AMLauncher implements Runna Map<String, String> environment = container.getEnvironment(); environment.put(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV, application.getWebProxyBase()); - // Set the AppAttemptId, containerId, NMHTTPAdress, AppSubmitTime to be - // consumable by the AM. - environment.put(ApplicationConstants.AM_CONTAINER_ID_ENV, - containerID.toString()); - environment.put(ApplicationConstants.NM_HOST_ENV, masterContainer - .getNodeId().getHost()); - environment.put(ApplicationConstants.NM_PORT_ENV, - String.valueOf(masterContainer.getNodeId().getPort())); - String parts[] = - masterContainer.getNodeHttpAddress().split(":"); - environment.put(ApplicationConstants.NM_HTTP_PORT_ENV, parts[1]); + // Set AppSubmitTime and MaxAppAttempts to be consumable by the AM. ApplicationId applicationId = application.getAppAttemptId().getApplicationId(); environment.put( Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Wed Apr 24 02:21:58 2013 @@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + /** * Service to renew application delegation tokens. */ @@ -139,7 +141,8 @@ public class DelegationTokenRenewer exte * class that is used for keeping tracks of DT to renew * */ - private static class DelegationTokenToRenew { + @VisibleForTesting + protected static class DelegationTokenToRenew { public final Token<?> token; public final ApplicationId applicationId; public final Configuration conf; @@ -252,7 +255,16 @@ public class DelegationTokenRenewer exte private void addTokenToList(DelegationTokenToRenew t) { delegationTokens.add(t); } - + + @VisibleForTesting + public Set<Token<?>> getDelegationTokens() { + Set<Token<?>> tokens = new HashSet<Token<?>>(); + for(DelegationTokenToRenew delegationToken : delegationTokens) { + tokens.add(delegationToken.token); + } + return tokens; + } + /** * Add application tokens for renewal. * @param applicationId added application @@ -343,7 +355,8 @@ public class DelegationTokenRenewer exte /** * set task to renew the token */ - private void setTimerForTokenRenewal(DelegationTokenToRenew token) + @VisibleForTesting + protected void setTimerForTokenRenewal(DelegationTokenToRenew token) throws IOException { // calculate timer time @@ -358,7 +371,8 @@ public class DelegationTokenRenewer exte } // renew a token - private void renewToken(final DelegationTokenToRenew dttr) + @VisibleForTesting + protected void renewToken(final DelegationTokenToRenew dttr) throws IOException { // need to use doAs so that http can find the kerberos tgt // NOTE: token renewers should be responsible for the correct UGI! Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Wed Apr 24 02:21:58 2013 @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.nio.ByteBuffer; import java.security.PrivilegedAction; import java.util.Map; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ClientRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; @@ -41,6 +44,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -130,26 +134,26 @@ public class MockRM extends ResourceMana public RMApp submitApp(int masterMemory, String name, String user) throws Exception { return submitApp(masterMemory, name, user, null, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map<ApplicationAccessType, String> acls) throws Exception { return submitApp(masterMemory, name, user, acls, false, null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map<ApplicationAccessType, String> acls, String queue) throws Exception { return submitApp(masterMemory, name, user, acls, false, queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); } public RMApp submitApp(int masterMemory, String name, String user, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, - int maxAppAttempts) throws Exception { + int maxAppAttempts, Credentials ts) throws Exception { ClientRMProtocol client = getClientRMService(); GetNewApplicationResponse resp = client.getNewApplication(Records .newRecord(GetNewApplicationRequest.class)); @@ -175,6 +179,12 @@ public class MockRM extends ResourceMana sub.setResource(capability); clc.setApplicationACLs(acls); clc.setUser(user); + if (ts != null && UserGroupInformation.isSecurityEnabled()) { + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setContainerTokens(securityTokens); + } sub.setAMContainerSpec(clc); req.setApplicationSubmissionContext(sub); UserGroupInformation fakeUser = @@ -357,6 +367,10 @@ public class MockRM extends ResourceMana return this.nodesListManager; } + public RMDelegationTokenSecretManager getRMDTSecretManager() { + return this.rmDTSecretManager; + } + @Override protected void startWepApp() { // override to disable webapp Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java Wed Apr 24 02:21:58 2013 @@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -71,17 +70,17 @@ public class TestApplicationMasterLaunch launched = true; Map<String, String> env = request.getContainerLaunchContext().getEnvironment(); - containerIdAtContainerManager = - env.get(ApplicationConstants.AM_CONTAINER_ID_ENV); ContainerId containerId = - ConverterUtils.toContainerId(containerIdAtContainerManager); + request.getContainer().getId(); + containerIdAtContainerManager = containerId.toString(); attemptIdAtContainerManager = containerId.getApplicationAttemptId().toString(); - nmHostAtContainerManager = env.get(ApplicationConstants.NM_HOST_ENV); + nmHostAtContainerManager = request.getContainer().getNodeId().getHost(); nmPortAtContainerManager = - Integer.parseInt(env.get(ApplicationConstants.NM_PORT_ENV)); + request.getContainer().getNodeId().getPort(); nmHttpPortAtContainerManager = - Integer.parseInt(env.get(ApplicationConstants.NM_HTTP_PORT_ENV)); + Integer.parseInt(request.getContainer().getNodeHttpAddress() + .split(":")[1]); submitTimeAtContainerManager = Long.parseLong(env.get(ApplicationConstants.APP_SUBMIT_TIME_ENV)); maxAppAttempts = Modified: hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1471229&r1=1471228&r2=1471229&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Wed Apr 24 02:21:58 2013 @@ -18,11 +18,21 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -33,9 +43,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -43,6 +55,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -60,10 +74,8 @@ public class TestRMRestart { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, - "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); - conf.set(YarnConfiguration.RM_SCHEDULER, - "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -159,7 +171,7 @@ public class TestRMRestart { // create unmanaged app RMApp appUnmanaged = rm1.submitApp(200, "someApp", "someUser", null, true, null, conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, - YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null); ApplicationAttemptId unmanagedAttemptId = appUnmanaged.getCurrentAppAttempt().getAppAttemptId(); // assert appUnmanaged info is saved @@ -321,8 +333,7 @@ public class TestRMRestart { YarnConfiguration conf = new YarnConfiguration(); conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); - conf.set(YarnConfiguration.RM_STORE, - "org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -340,10 +351,12 @@ public class TestRMRestart { // submit an app with maxAppAttempts equals to 1 RMApp app1 = rm1.submitApp(200, "name", "user", - new HashMap<ApplicationAccessType, String>(), false, "default", 1); + new HashMap<ApplicationAccessType, String>(), false, "default", 1, + null); // submit an app with maxAppAttempts equals to -1 RMApp app2 = rm1.submitApp(200, "name", "user", - new HashMap<ApplicationAccessType, String>(), false, "default", -1); + new HashMap<ApplicationAccessType, String>(), false, "default", -1, + null); // assert app1 info is saved ApplicationState appState = rmAppState.get(app1.getApplicationId()); @@ -389,4 +402,113 @@ public class TestRMRestart { rm1.stop(); rm2.stop(); } + + @Test + public void testTokenRestoredOnRMrestart() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + ExitUtil.disableSystemExit(); + + YarnConfiguration conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + + Map<ApplicationId, ApplicationState> rmAppState = + rmState.getApplicationState(); + MockRM rm1 = new MyMockRM(conf, memStore); + rm1.start(); + + HashSet<Token<RMDelegationTokenIdentifier>> tokenSet = + new HashSet<Token<RMDelegationTokenIdentifier>>(); + + // create an empty credential + Credentials ts = new Credentials(); + + // create tokens and add into credential + Text userText1 = new Text("user1"); + RMDelegationTokenIdentifier dtId1 = + new RMDelegationTokenIdentifier(userText1, new Text("renewer1"), + userText1); + Token<RMDelegationTokenIdentifier> token1 = + new Token<RMDelegationTokenIdentifier>(dtId1, + rm1.getRMDTSecretManager()); + ts.addToken(userText1, token1); + tokenSet.add(token1); + + Text userText2 = new Text("user2"); + RMDelegationTokenIdentifier dtId2 = + new RMDelegationTokenIdentifier(userText2, new Text("renewer2"), + userText2); + Token<RMDelegationTokenIdentifier> token2 = + new Token<RMDelegationTokenIdentifier>(dtId2, + rm1.getRMDTSecretManager()); + ts.addToken(userText2, token2); + tokenSet.add(token2); + + // submit an app with customized credential + RMApp app = rm1.submitApp(200, "name", "user", + new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts); + + // assert app info is saved + ApplicationState appState = rmAppState.get(app.getApplicationId()); + Assert.assertNotNull(appState); + + // assert delegation tokens are saved + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = + ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + Assert.assertEquals(securityTokens, appState + .getApplicationSubmissionContext().getAMContainerSpec() + .getContainerTokens()); + + // start new RM + MockRM rm2 = new MyMockRM(conf, memStore); + rm2.start(); + + // verify tokens are properly populated back to DelegationTokenRenewer + Assert.assertEquals(tokenSet, rm1.getRMContext() + .getDelegationTokenRenewer().getDelegationTokens()); + + // stop the RM + rm1.stop(); + rm2.stop(); + } + + class MyMockRM extends MockRM { + + public MyMockRM(Configuration conf, RMStateStore store) { + super(conf, store); + } + + @Override + protected void doSecureLogin() throws IOException { + // Do nothing. + } + + @Override + protected DelegationTokenRenewer createDelegationTokenRenewer() { + return new DelegationTokenRenewer() { + @Override + protected void renewToken(final DelegationTokenToRenew dttr) + throws IOException { + // Do nothing + } + + @Override + protected void setTimerForTokenRenewal(DelegationTokenToRenew token) + throws IOException { + // Do nothing + } + }; + } + } }
