Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1494369&r1=1494368&r2=1494369&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Jun 18 23:19:49 2013 @@ -380,7 +380,7 @@ public class ApplicationMasterService ex // Adding NMTokens for allocated containers. if (!allocation.getContainers().isEmpty()) { allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager() - .getNMTokens(app.getUser(), appAttemptId, + .createAndGetNMTokens(app.getUser(), appAttemptId, allocation.getContainers())); } return allocateResponse;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1494369&r1=1494368&r2=1494369&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Jun 18 23:19:49 2013 @@ -131,21 +131,30 @@ public class AMLauncher implements Runna final YarnRPC rpc = YarnRPC.create(conf); // TODO: Don't create again and again. - UserGroupInformation currentUser = UserGroupInformation - .createRemoteUser(containerId.toString()); - if (UserGroupInformation.isSecurityEnabled()) { - Token<ContainerTokenIdentifier> token = - ConverterUtils.convertFromYarn(masterContainer - .getContainerToken(), containerManagerBindAddress); - currentUser.addToken(token); - } - return currentUser.doAs(new PrivilegedAction<ContainerManagementProtocol>() { - @Override - public ContainerManagementProtocol run() { - return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class, - containerManagerBindAddress, conf); - } - }); + UserGroupInformation currentUser = + UserGroupInformation.createRemoteUser(containerId + .getApplicationAttemptId().toString()); + + String user = + rmContext.getRMApps() + .get(containerId.getApplicationAttemptId().getApplicationId()) + .getUser(); + org.apache.hadoop.yarn.api.records.Token token = + rmContext.getNMTokenSecretManager().createNMToken( + containerId.getApplicationAttemptId(), node, user); + currentUser.addToken(ConverterUtils.convertFromYarn(token, + containerManagerBindAddress)); + + return currentUser + .doAs(new PrivilegedAction<ContainerManagementProtocol>() { + + @Override + public ContainerManagementProtocol run() { + return (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, + containerManagerBindAddress, conf); + } + }); } private ContainerLaunchContext createAMContainerLaunchContext( @@ -234,7 +243,13 @@ public class AMLauncher implements Runna } catch(IOException ie) { LOG.info("Error cleaning master ", ie); } catch (YarnException e) { - LOG.info("Error cleaning master ", e); + StringBuilder sb = new StringBuilder("Container "); + sb.append(masterContainer.getId().toString()); + sb.append(" is not handled by this NodeManager"); + if (!e.getMessage().contains(sb.toString())) { + // Ignoring if container is already killed by Node Manager. + LOG.info("Error cleaning master ", e); + } } break; default: Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java?rev=1494369&r1=1494368&r2=1494369&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java Tue Jun 18 23:19:49 2013 @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -31,16 +30,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -183,7 +178,7 @@ public class NMTokenSecretManagerInRM ex } } - public List<NMToken> getNMTokens(String applicationSubmitter, + public List<NMToken> createAndGetNMTokens(String applicationSubmitter, ApplicationAttemptId appAttemptId, List<Container> containers) { try { this.readLock.lock(); @@ -193,12 +188,14 @@ public class NMTokenSecretManagerInRM ex for (Container container : containers) { if (!nodeSet.contains(container.getNodeId())) { LOG.debug("Sending NMToken for nodeId : " - + container.getNodeId().toString()); + + container.getNodeId().toString() + + " for application attempt : " + appAttemptId.toString()); Token token = createNMToken(appAttemptId, container.getNodeId(), applicationSubmitter); NMToken nmToken = NMToken.newInstance(container.getNodeId(), token); nmTokens.add(nmToken); + // This will update the nmToken set. nodeSet.add(container.getNodeId()); } } @@ -273,38 +270,4 @@ public class NMTokenSecretManagerInRM ex this.writeLock.unlock(); } } - - public static Token newNMToken(byte[] password, - NMTokenIdentifier identifier) { - NodeId nodeId = identifier.getNodeId(); - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = - NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort()); - Token nmToken = - Token.newInstance(identifier.getBytes(), - NMTokenIdentifier.KIND.toString(), password, SecurityUtil - .buildTokenService(addr).toString()); - return nmToken; - } - - /** - * Helper function for creating NMTokens. - */ - public Token createNMToken(ApplicationAttemptId applicationAttemptId, - NodeId nodeId, String applicationSubmitter) { - byte[] password; - NMTokenIdentifier identifier; - - this.readLock.lock(); - try { - identifier = - new NMTokenIdentifier(applicationAttemptId, nodeId, - applicationSubmitter, this.currentMasterKey.getMasterKey() - .getKeyId()); - password = this.createPassword(identifier); - } finally { - this.readLock.unlock(); - } - return newNMToken(password, identifier); - } } Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1494369&r1=1494368&r2=1494369&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Tue Jun 18 23:19:49 2013 @@ -59,9 +59,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; -import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -310,6 +308,7 @@ public class MockRM extends ResourceMana Configuration conf = new Configuration(); containerTokenSecretManager.rollMasterKey(); + nmTokenSecretManager.rollMasterKey(); return new ResourceTrackerService(getRMContext(), nodesListManager, this.nmLivelinessMonitor, containerTokenSecretManager, nmTokenSecretManager) { Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java?rev=1494369&r1=1494368&r2=1494369&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java Tue Jun 18 23:19:49 2013 @@ -22,14 +22,7 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.security.PrivilegedAction; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import junit.framework.Assert; @@ -37,45 +30,30 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; @@ -111,15 +89,15 @@ public class TestContainerManagerSecurit yarnCluster.init(conf); yarnCluster.start(); - // Testing for authenticated user - testAuthenticatedUser(); + // TestNMTokens. + testNMTokens(conf); - // Testing for malicious user - testMaliceUser(); - - // Testing for usage of expired tokens - testExpiredTokens(); + // Testing for container token tampering + testContainerToken(conf); + } catch (Exception e) { + e.printStackTrace(); + throw e; } finally { if (yarnCluster != null) { yarnCluster.stop(); @@ -128,57 +106,264 @@ public class TestContainerManagerSecurit } } - private void testAuthenticatedUser() throws IOException, - InterruptedException, YarnException { + private void testNMTokens(Configuration conf) throws Exception { + NMTokenSecretManagerInRM nmTokenSecretManagerRM = + yarnCluster.getResourceManager().getRMContext() + .getNMTokenSecretManager(); + NMTokenSecretManagerInNM nmTokenSecretManagerNM = + yarnCluster.getNodeManager(0).getNMContext().getNMTokenSecretManager(); + RMContainerTokenSecretManager containerTokenSecretManager = + yarnCluster.getResourceManager().getRMContainerTokenSecretManager(); + + NodeManager nm = yarnCluster.getNodeManager(0); + + waitForNMToReceiveNMTokenKey(nmTokenSecretManagerNM, nm); + + // Both id should be equal. + Assert.assertEquals(nmTokenSecretManagerNM.getCurrentKey().getKeyId(), + nmTokenSecretManagerRM.getCurrentKey().getKeyId()); + + /* + * Below cases should be tested. + * 1) If Invalid NMToken is used then it should be rejected. + * 2) If valid NMToken but belonging to another Node is used then that + * too should be rejected. + * 3) NMToken for say appAttempt-1 is used for starting/stopping/retrieving + * status for container with containerId for say appAttempt-2 should + * be rejected. + * 4) After start container call is successful nmtoken should have been + * saved in NMTokenSecretManagerInNM. + * 5) If start container call was successful (no matter if container is + * still running or not), appAttempt->NMToken should be present in + * NMTokenSecretManagerInNM's cache. Any future getContainerStatus call + * for containerId belonging to that application attempt using + * applicationAttempt's older nmToken should not get any invalid + * nmToken error. (This can be best tested if we roll over NMToken + * master key twice). + */ + YarnRPC rpc = YarnRPC.create(conf); + String user = "test"; + Resource r = Resource.newInstance(1024, 1); + + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId validAppAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + ApplicationAttemptId invalidAppAttemptId = + ApplicationAttemptId.newInstance(appId, 2); + + ContainerId validContainerId = + ContainerId.newInstance(validAppAttemptId, 0); + + NodeId validNode = yarnCluster.getNodeManager(0).getNMContext().getNodeId(); + NodeId invalidNode = NodeId.newInstance("InvalidHost", 1234); - LOG.info("Running test for authenticated user"); + + org.apache.hadoop.yarn.api.records.Token validNMToken = + nmTokenSecretManagerRM.createNMToken(validAppAttemptId, validNode, user); + + org.apache.hadoop.yarn.api.records.Token validContainerToken = + containerTokenSecretManager.createContainerToken(validContainerId, + validNode, user, r); + + StringBuilder sb; + // testInvalidNMToken ... creating NMToken using different secret manager. + + NMTokenSecretManagerInRM tempManager = new NMTokenSecretManagerInRM(conf); + tempManager.rollMasterKey(); + do { + tempManager.rollMasterKey(); + tempManager.activateNextMasterKey(); + // Making sure key id is different. + } while (tempManager.getCurrentKey().getKeyId() == nmTokenSecretManagerRM + .getCurrentKey().getKeyId()); + + org.apache.hadoop.yarn.api.records.Token invalidNMToken = + tempManager.createNMToken(validAppAttemptId, validNode, user); + sb = new StringBuilder("Given NMToken for application : "); + sb.append(validAppAttemptId.toString()) + .append(" seems to have been generated illegally."); + Assert.assertTrue(sb.toString().contains( + testStartContainer(rpc, validAppAttemptId, validNode, + validContainerToken, invalidNMToken, true))); + + // valid NMToken but belonging to other node + invalidNMToken = + nmTokenSecretManagerRM.createNMToken(validAppAttemptId, invalidNode, + user); + sb = new StringBuilder("Given NMToken for application : "); + sb.append(validAppAttemptId) + .append(" is not valid for current node manager.expected : ") + .append(validNode.toString()) + .append(" found : ").append(invalidNode.toString()); + Assert.assertTrue(sb.toString().contains( + testStartContainer(rpc, validAppAttemptId, validNode, + validContainerToken, invalidNMToken, true))); + + // using appAttempt-2 token for launching container for appAttempt-1. + invalidNMToken = + nmTokenSecretManagerRM.createNMToken(invalidAppAttemptId, validNode, + user); + sb = new StringBuilder("\nNMToken for application attempt : "); + sb.append(invalidAppAttemptId.toString()) + .append(" was used for starting container with container token") + .append(" issued for application attempt : ") + .append(validAppAttemptId.toString()); + Assert.assertTrue(testStartContainer(rpc, validAppAttemptId, validNode, + validContainerToken, invalidNMToken, true).contains(sb.toString())); + + // using correct tokens. nmtoken for appattempt should get saved. + testStartContainer(rpc, validAppAttemptId, validNode, validContainerToken, + validNMToken, false); + Assert.assertTrue(nmTokenSecretManagerNM + .isAppAttemptNMTokenKeyPresent(validAppAttemptId)); + + // Rolling over master key twice so that we can check whether older keys + // are used for authentication. + rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); + // Key rolled over once.. rolling over again + rollNMTokenMasterKey(nmTokenSecretManagerRM, nmTokenSecretManagerNM); + + // trying get container status. Now saved nmToken should be used for + // authentication. + sb = new StringBuilder("Container "); + sb.append(validContainerId.toString()); + sb.append(" is not handled by this NodeManager"); + Assert.assertTrue(testGetContainer(rpc, validAppAttemptId, validNode, + validContainerId, validNMToken, false).contains(sb.toString())); + + } - ResourceManager resourceManager = yarnCluster.getResourceManager(); + protected void waitForNMToReceiveNMTokenKey( + NMTokenSecretManagerInNM nmTokenSecretManagerNM, NodeManager nm) + throws InterruptedException { + int attempt = 60; + ContainerManagerImpl cm = + ((ContainerManagerImpl) nm.getNMContext().getContainerManager()); + while ((cm.getBlockNewContainerRequestsStatus() || nmTokenSecretManagerNM + .getNodeId() == null) && attempt-- > 0) { + Thread.sleep(2000); + } + } - final YarnRPC yarnRPC = YarnRPC.create(conf); + protected void rollNMTokenMasterKey( + NMTokenSecretManagerInRM nmTokenSecretManagerRM, + NMTokenSecretManagerInNM nmTokenSecretManagerNM) throws Exception { + int oldKeyId = nmTokenSecretManagerRM.getCurrentKey().getKeyId(); + nmTokenSecretManagerRM.rollMasterKey(); + int interval = 40; + while (nmTokenSecretManagerNM.getCurrentKey().getKeyId() == oldKeyId + && interval-- > 0) { + Thread.sleep(1000); + } + nmTokenSecretManagerRM.activateNextMasterKey(); + Assert.assertTrue((nmTokenSecretManagerNM.getCurrentKey().getKeyId() + == nmTokenSecretManagerRM.getCurrentKey().getKeyId())); + } - // Submit an application - ApplicationId appID = resourceManager.getClientRMService() - .getNewApplication(Records.newRecord(GetNewApplicationRequest.class)) - .getApplicationId(); - ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager, - yarnRPC, appID); - - // Now request a container. - final Container allocatedContainer = requestAndGetContainer(scheduler, - appID); - - // Now talk to the NM for launching the container. - final ContainerId containerID = allocatedContainer.getId(); - UserGroupInformation authenticatedUser = UserGroupInformation - .createRemoteUser(containerID.toString()); - org.apache.hadoop.yarn.api.records.Token containerToken = - allocatedContainer.getContainerToken(); - Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>( - containerToken.getIdentifier().array(), containerToken.getPassword() - .array(), new Text(containerToken.getKind()), new Text( - containerToken.getService())); - authenticatedUser.addToken(token); - authenticatedUser.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy( - ContainerManagementProtocol.class, NetUtils - .createSocketAddr(allocatedContainer.getNodeId().toString()), - conf); - LOG.info("Going to make a legal stopContainer() request"); - StopContainerRequest request = recordFactory - .newRecordInstance(StopContainerRequest.class); - request.setContainerId(containerID); - client.stopContainer(request); - return null; + private String testGetContainer(YarnRPC rpc, + ApplicationAttemptId appAttemptId, NodeId nodeId, + ContainerId containerId, + org.apache.hadoop.yarn.api.records.Token nmToken, + boolean isExceptionExpected) { + try { + getContainerStatus(rpc, nmToken, containerId, appAttemptId, nodeId, + isExceptionExpected); + if (isExceptionExpected) { + fail("Exception was expected!!"); } - }); + return ""; + } catch (Exception e) { + e.printStackTrace(); + return e.getMessage(); + } + } - KillApplicationRequest request = Records - .newRecord(KillApplicationRequest.class); - request.setApplicationId(appID); - resourceManager.getClientRMService().forceKillApplication(request); + protected String testStartContainer(YarnRPC rpc, + ApplicationAttemptId appAttemptId, NodeId nodeId, + org.apache.hadoop.yarn.api.records.Token containerToken, + org.apache.hadoop.yarn.api.records.Token nmToken, + boolean isExceptionExpected) { + try { + startContainer(rpc, nmToken, containerToken, nodeId, + appAttemptId.toString()); + if (isExceptionExpected){ + fail("Exception was expected!!"); + } + return ""; + } catch (Exception e) { + e.printStackTrace(); + return e.getMessage(); + } + } + + private void + getContainerStatus(YarnRPC rpc, + org.apache.hadoop.yarn.api.records.Token nmToken, + ContainerId containerId, + ApplicationAttemptId appAttemptId, NodeId nodeId, + boolean isExceptionExpected) throws Exception { + GetContainerStatusRequest request = + Records.newRecord(GetContainerStatusRequest.class); + request.setContainerId(containerId); + + ContainerManagementProtocol proxy = null; + + try { + proxy = + getContainerManagementProtocolProxy(rpc, nmToken, nodeId, + appAttemptId.toString()); + proxy.getContainerStatus(request); + + } finally { + if (proxy != null) { + rpc.stopProxy(proxy, conf); + } + } + } + + private void startContainer(final YarnRPC rpc, + org.apache.hadoop.yarn.api.records.Token nmToken, + org.apache.hadoop.yarn.api.records.Token containerToken, + NodeId nodeId, String user) throws Exception { + + StartContainerRequest request = + Records.newRecord(StartContainerRequest.class); + request.setContainerToken(containerToken); + ContainerLaunchContext context = + Records.newRecord(ContainerLaunchContext.class); + request.setContainerLaunchContext(context); + + ContainerManagementProtocol proxy = null; + try { + proxy = getContainerManagementProtocolProxy(rpc, nmToken, nodeId, user); + proxy.startContainer(request); + } finally { + if (proxy != null) { + rpc.stopProxy(proxy, conf); + } + } + } + + protected ContainerManagementProtocol getContainerManagementProtocolProxy( + final YarnRPC rpc, org.apache.hadoop.yarn.api.records.Token nmToken, + NodeId nodeId, String user) { + ContainerManagementProtocol proxy; + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + final InetSocketAddress addr = + NetUtils.createSocketAddr(nodeId.getHost(), nodeId.getPort()); + ugi.addToken(ConverterUtils.convertFromYarn(nmToken, addr)); + + proxy = ugi + .doAs(new PrivilegedAction<ContainerManagementProtocol>() { + + @Override + public ContainerManagementProtocol run() { + return (ContainerManagementProtocol) rpc.getProxy( + ContainerManagementProtocol.class, + addr, conf); + } + }); + return proxy; } /** @@ -190,349 +375,61 @@ public class TestContainerManagerSecurit * @throws InterruptedException * @throws YarnException */ - private void testMaliceUser() throws IOException, InterruptedException, - YarnException { + private void testContainerToken(Configuration conf) throws IOException, + InterruptedException, YarnException { LOG.info("Running test for malice user"); - - ResourceManager resourceManager = yarnCluster.getResourceManager(); - - final YarnRPC yarnRPC = YarnRPC.create(conf); - - // Submit an application - ApplicationId appID = resourceManager.getClientRMService() - .getNewApplication(Records.newRecord(GetNewApplicationRequest.class)) - .getApplicationId(); - ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager, - yarnRPC, appID); - - // Now request a container. - final Container allocatedContainer = requestAndGetContainer(scheduler, - appID); - - // Now talk to the NM for launching the container with modified resource - - org.apache.hadoop.yarn.api.records.Token containerToken = - allocatedContainer.getContainerToken(); - ContainerTokenIdentifier originalContainerTokenId = - BuilderUtils.newContainerTokenIdentifier(containerToken); - - // Malice user modifies the resource amount - Resource modifiedResource = BuilderUtils.newResource(2048, 1); - ContainerTokenIdentifier modifiedIdentifier = - new ContainerTokenIdentifier(originalContainerTokenId.getContainerID(), - originalContainerTokenId.getNmHostAddress(), "testUser", - modifiedResource, Long.MAX_VALUE, - originalContainerTokenId.getMasterKeyId(), - ResourceManager.clusterTimeStamp); - Token<ContainerTokenIdentifier> modifiedToken = - new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(), - containerToken.getPassword().array(), new Text( - containerToken.getKind()), new Text(containerToken.getService())); - makeTamperedStartContainerCall(yarnRPC, allocatedContainer, - modifiedIdentifier, modifiedToken); - - // Malice user modifies the container-Id - ContainerId newContainerId = - BuilderUtils.newContainerId( - BuilderUtils.newApplicationAttemptId(originalContainerTokenId - .getContainerID().getApplicationAttemptId().getApplicationId(), 1), - originalContainerTokenId.getContainerID().getId() + 42); - modifiedIdentifier = - new ContainerTokenIdentifier(newContainerId, - originalContainerTokenId.getNmHostAddress(), "testUser", - originalContainerTokenId.getResource(), Long.MAX_VALUE, - originalContainerTokenId.getMasterKeyId(), - ResourceManager.clusterTimeStamp); - modifiedToken = - new Token<ContainerTokenIdentifier>(modifiedIdentifier.getBytes(), - containerToken.getPassword().array(), new Text( - containerToken.getKind()), new Text(containerToken.getService())); - makeTamperedStartContainerCall(yarnRPC, allocatedContainer, - modifiedIdentifier, modifiedToken); - - // Similarly messing with anything else will fail. - - KillApplicationRequest request = Records - .newRecord(KillApplicationRequest.class); - request.setApplicationId(appID); - resourceManager.getClientRMService().forceKillApplication(request); - } - - private void makeTamperedStartContainerCall(final YarnRPC yarnRPC, - final Container allocatedContainer, - final ContainerTokenIdentifier modifiedIdentifier, - Token<ContainerTokenIdentifier> modifiedToken) { - final ContainerId containerID = allocatedContainer.getId(); - UserGroupInformation maliceUser = UserGroupInformation - .createRemoteUser(containerID.toString()); - maliceUser.addToken(modifiedToken); - maliceUser.doAs(new PrivilegedAction<Void>() { - @Override - public Void run() { - ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy( - ContainerManagementProtocol.class, NetUtils - .createSocketAddr(allocatedContainer.getNodeId().toString()), - conf); - - LOG.info("Going to contact NM: ilLegal request"); - StartContainerRequest request = - Records.newRecord(StartContainerRequest.class); - try { - request.setContainerToken(allocatedContainer.getContainerToken()); - ContainerLaunchContext context = - createContainerLaunchContextForTest(modifiedIdentifier); - request.setContainerLaunchContext(context); - client.startContainer(request); - fail("Connection initiation with illegally modified " - + "tokens is expected to fail."); - } catch (YarnException e) { - LOG.error("Got exception", e); - fail("Cannot get a YARN remote exception as " - + "it will indicate RPC success"); - } catch (Exception e) { - Assert.assertEquals( - javax.security.sasl.SaslException.class - .getCanonicalName(), e.getClass().getCanonicalName()); - Assert.assertTrue(e - .getMessage() - .contains( - "DIGEST-MD5: digest response format violation. " - + "Mismatched response.")); - } - return null; - } - }); - } - - private void testExpiredTokens() throws IOException, InterruptedException, - YarnException { - - LOG.info("\n\nRunning test for malice user"); - - ResourceManager resourceManager = yarnCluster.getResourceManager(); - - final YarnRPC yarnRPC = YarnRPC.create(conf); - - // Submit an application - final ApplicationId appID = resourceManager.getClientRMService() - .getNewApplication(Records.newRecord(GetNewApplicationRequest.class)) - .getApplicationId(); - ApplicationMasterProtocol scheduler = submitAndRegisterApplication(resourceManager, - yarnRPC, appID); - - // Now request a container. - final Container allocatedContainer = requestAndGetContainer(scheduler, - appID); - - // Now talk to the NM for launching the container with modified containerID - final ContainerId containerID = allocatedContainer.getId(); - - org.apache.hadoop.yarn.api.records.Token containerToken = - allocatedContainer.getContainerToken(); - final ContainerTokenIdentifier tokenId = - BuilderUtils.newContainerTokenIdentifier(containerToken); - - /////////// Test calls with expired tokens - UserGroupInformation unauthorizedUser = UserGroupInformation - .createRemoteUser(containerID.toString()); - - RMContainerTokenSecretManager containerTokenSecreteManager = - resourceManager.getRMContainerTokenSecretManager(); - final ContainerTokenIdentifier newTokenId = - new ContainerTokenIdentifier(tokenId.getContainerID(), - tokenId.getNmHostAddress(), tokenId.getApplicationSubmitter(), - tokenId.getResource(), System.currentTimeMillis() - 1, - containerTokenSecreteManager.getCurrentKey().getKeyId(), - ResourceManager.clusterTimeStamp); - final byte[] passowrd = - containerTokenSecreteManager.createPassword( - newTokenId); - // Create a valid token by using the key from the RM. - Token<ContainerTokenIdentifier> token = - new Token<ContainerTokenIdentifier>(newTokenId.getBytes(), passowrd, - new Text(containerToken.getKind()), new Text( - containerToken.getService())); - - unauthorizedUser.addToken(token); - unauthorizedUser.doAs(new PrivilegedAction<Void>() { - @Override - public Void run() { - ContainerManagementProtocol client = (ContainerManagementProtocol) yarnRPC.getProxy( - ContainerManagementProtocol.class, NetUtils - .createSocketAddr(allocatedContainer.getNodeId().toString()), - conf); - - LOG.info("Going to contact NM with expired token"); - ContainerLaunchContext context = createContainerLaunchContextForTest(newTokenId); - StartContainerRequest request = - Records.newRecord(StartContainerRequest.class); - request.setContainerLaunchContext(context); - allocatedContainer.setContainerToken(BuilderUtils.newContainerToken( - allocatedContainer.getNodeId(), passowrd, newTokenId)); - request.setContainerToken(allocatedContainer.getContainerToken()); - - //Calling startContainer with an expired token. - try { - client.startContainer(request); - fail("Connection initiation with expired " - + "token is expected to fail."); - } catch (Throwable t) { - LOG.info("Got exception : ", t); - Assert.assertTrue(t.getMessage().contains( - "This token is expired. current time is")); - } - - // Try stopping a container - should not get an expiry error. - StopContainerRequest stopRequest = Records.newRecord(StopContainerRequest.class); - stopRequest.setContainerId(newTokenId.getContainerID()); - try { - client.stopContainer(stopRequest); - } catch (Throwable t) { - fail("Stop Container call should have succeeded"); - } - - return null; - } - }); - /////////// End of testing calls with expired tokens - - KillApplicationRequest request = Records - .newRecord(KillApplicationRequest.class); - request.setApplicationId(appID); - resourceManager.getClientRMService().forceKillApplication(request); - } - - private ApplicationMasterProtocol submitAndRegisterApplication( - ResourceManager resourceManager, final YarnRPC yarnRPC, - ApplicationId appID) throws IOException, - UnsupportedFileSystemException, YarnException, - InterruptedException { - - // Use ping to simulate sleep on Windows. - List<String> cmd = Shell.WINDOWS ? - Arrays.asList("ping", "-n", "100", "127.0.0.1", ">nul") : - Arrays.asList("sleep", "100"); - - ContainerLaunchContext amContainer = - BuilderUtils.newContainerLaunchContext( - Collections.<String, LocalResource> emptyMap(), - new HashMap<String, String>(), cmd, - new HashMap<String, ByteBuffer>(), null, - new HashMap<ApplicationAccessType, String>()); - - ApplicationSubmissionContext appSubmissionContext = recordFactory - .newRecordInstance(ApplicationSubmissionContext.class); - appSubmissionContext.setApplicationId(appID); - appSubmissionContext.setAMContainerSpec(amContainer); - appSubmissionContext.setResource(BuilderUtils.newResource(1024, 1)); - - SubmitApplicationRequest submitRequest = recordFactory - .newRecordInstance(SubmitApplicationRequest.class); - submitRequest.setApplicationSubmissionContext(appSubmissionContext); - resourceManager.getClientRMService().submitApplication(submitRequest); - - // Wait till container gets allocated for AM - int waitCounter = 0; - RMApp app = resourceManager.getRMContext().getRMApps().get(appID); - RMAppAttempt appAttempt = app == null ? null : app.getCurrentAppAttempt(); - RMAppAttemptState state = appAttempt == null ? null : appAttempt - .getAppAttemptState(); - while ((app == null || appAttempt == null || state == null || !state - .equals(RMAppAttemptState.LAUNCHED)) - && waitCounter++ != 20) { - LOG.info("Waiting for applicationAttempt to be created.. "); - Thread.sleep(1000); - app = resourceManager.getRMContext().getRMApps().get(appID); - appAttempt = app == null ? null : app.getCurrentAppAttempt(); - state = appAttempt == null ? null : appAttempt.getAppAttemptState(); - } - Assert.assertNotNull(app); - Assert.assertNotNull(appAttempt); - Assert.assertNotNull(state); - Assert.assertEquals(RMAppAttemptState.LAUNCHED, state); - - UserGroupInformation currentUser = UserGroupInformation.createRemoteUser( - appAttempt.getAppAttemptId().toString()); - - // Ask for a container from the RM - final InetSocketAddress schedulerAddr = - resourceManager.getApplicationMasterService().getBindAddress(); - if (UserGroupInformation.isSecurityEnabled()) { - AMRMTokenIdentifier appTokenIdentifier = new AMRMTokenIdentifier( - appAttempt.getAppAttemptId()); - AMRMTokenSecretManager appTokenSecretManager = - new AMRMTokenSecretManager(conf); - appTokenSecretManager.setMasterKey(resourceManager - .getAMRMTokenSecretManager().getMasterKey()); - Token<AMRMTokenIdentifier> appToken = - new Token<AMRMTokenIdentifier>(appTokenIdentifier, - appTokenSecretManager); - SecurityUtil.setTokenService(appToken, schedulerAddr); - currentUser.addToken(appToken); - } + /* + * We need to check for containerToken (authorization). + * Here we will be assuming that we have valid NMToken + * 1) ContainerToken used is expired. + * 2) ContainerToken is tampered (resource is modified). + */ + NMTokenSecretManagerInRM nmTokenSecretManagerInRM = + yarnCluster.getResourceManager().getRMContext() + .getNMTokenSecretManager(); + ApplicationId appId = ApplicationId.newInstance(1, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + ContainerId cId = ContainerId.newInstance(appAttemptId, 0); + NodeManager nm = yarnCluster.getNodeManager(0); + NMTokenSecretManagerInNM nmTokenSecretManagerInNM = + nm.getNMContext().getNMTokenSecretManager(); + String user = "test"; - ApplicationMasterProtocol scheduler = currentUser - .doAs(new PrivilegedAction<ApplicationMasterProtocol>() { - @Override - public ApplicationMasterProtocol run() { - return (ApplicationMasterProtocol) yarnRPC.getProxy(ApplicationMasterProtocol.class, - schedulerAddr, conf); - } - }); + waitForNMToReceiveNMTokenKey(nmTokenSecretManagerInNM, nm); - // Register the appMaster - RegisterApplicationMasterRequest request = recordFactory - .newRecordInstance(RegisterApplicationMasterRequest.class); - request.setApplicationAttemptId(resourceManager.getRMContext() - .getRMApps().get(appID).getCurrentAppAttempt().getAppAttemptId()); - scheduler.registerApplicationMaster(request); - return scheduler; - } - - private Container requestAndGetContainer(ApplicationMasterProtocol scheduler, - ApplicationId appID) throws YarnException, InterruptedException, - IOException { - - // Request a container allocation. - List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); - ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), - ResourceRequest.ANY, BuilderUtils.newResource(1024, 1), 1)); - - AllocateRequest allocateRequest = AllocateRequest.newInstance( - BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask, - new ArrayList<ContainerId>(), null); - List<Container> allocatedContainers = scheduler.allocate(allocateRequest) - .getAllocatedContainers(); - - // Modify ask to request no more. - allocateRequest.setAskList(new ArrayList<ResourceRequest>()); - - int waitCounter = 0; - while ((allocatedContainers == null || allocatedContainers.size() == 0) - && waitCounter++ != 20) { - LOG.info("Waiting for container to be allocated.."); - Thread.sleep(1000); - allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); - allocatedContainers = scheduler.allocate(allocateRequest) - .getAllocatedContainers(); - } - - Assert.assertNotNull("Container is not allocted!", allocatedContainers); - Assert.assertEquals("Didn't get one container!", 1, allocatedContainers - .size()); - - return allocatedContainers.get(0); - } - - private ContainerLaunchContext createContainerLaunchContextForTest( - ContainerTokenIdentifier tokenId) { - ContainerLaunchContext context = - BuilderUtils.newContainerLaunchContext( - new HashMap<String, LocalResource>(), - new HashMap<String, String>(), new ArrayList<String>(), - new HashMap<String, ByteBuffer>(), null, - new HashMap<ApplicationAccessType, String>()); - return context; + NodeId nodeId = nm.getNMContext().getNodeId(); + + // Both id should be equal. + Assert.assertEquals(nmTokenSecretManagerInNM.getCurrentKey().getKeyId(), + nmTokenSecretManagerInRM.getCurrentKey().getKeyId()); + + // Creating a tampered Container Token + RMContainerTokenSecretManager containerTokenSecretManager = + yarnCluster.getResourceManager().getRMContainerTokenSecretManager(); + + RMContainerTokenSecretManager tamperedContainerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + tamperedContainerTokenSecretManager.rollMasterKey(); + do { + tamperedContainerTokenSecretManager.rollMasterKey(); + tamperedContainerTokenSecretManager.activateNextMasterKey(); + } while (containerTokenSecretManager.getCurrentKey().getKeyId() + == tamperedContainerTokenSecretManager.getCurrentKey().getKeyId()); + + Resource r = Resource.newInstance(1230, 2); + // Creating modified containerToken + Token containerToken = + tamperedContainerTokenSecretManager.createContainerToken(cId, nodeId, + user, r); + Token nmToken = + nmTokenSecretManagerInRM.createNMToken(appAttemptId, nodeId, user); + YarnRPC rpc = YarnRPC.create(conf); + StringBuilder sb = new StringBuilder("Given Container "); + sb.append(cId); + sb.append(" seems to have an illegally generated token."); + Assert.assertTrue(testStartContainer(rpc, appAttemptId, nodeId, + containerToken, nmToken, true).contains(sb.toString())); } }
