Author: cos Date: Fri May 31 22:14:01 2013 New Revision: 1488412 URL: http://svn.apache.org/r1488412 Log: MAPREDUCE-5240 inside of FileOutputCommitter the initialized Credentials cache appears to be empty
Modified: hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Modified: hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/CHANGES.txt?rev=1488412&r1=1488411&r2=1488412&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/CHANGES.txt Fri May 31 22:14:01 2013 @@ -28,6 +28,9 @@ Release 2.0.4-alpha - UNRELEASED MAPREDUCE-5094. Disabled memory monitoring by default in MiniMRYarnCluster to avoid some downstream tests failing. (Siddharth Seth via vinodkv) + MAPREDUCE-5240. inside of FileOutputCommitter the initialized Credentials + cache appears to be empty (vinodkv, rvs via cos) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1488412&r1=1488411&r2=1488412&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri May 31 22:14:01 2013 @@ -1214,7 +1214,7 @@ public class MRAppMaster extends Composi Integer.parseInt(nodeHttpPortString), appSubmitTime); ShutdownHookManager.get().addShutdownHook( new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); - YarnConfiguration conf = new YarnConfiguration(new JobConf()); + JobConf conf = new JobConf(new YarnConfiguration()); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); String jobUserName = System .getenv(ApplicationConstants.Environment.USER.name()); @@ -1261,11 +1261,14 @@ public class MRAppMaster extends Composi } protected static void initAndStartAppMaster(final MRAppMaster appMaster, - final YarnConfiguration conf, String jobUserName) throws IOException, + final JobConf conf, String jobUserName) throws IOException, InterruptedException { UserGroupInformation.setConfiguration(conf); + Credentials credentials = + UserGroupInformation.getCurrentUser().getCredentials(); UserGroupInformation appMasterUgi = UserGroupInformation .createRemoteUser(jobUserName); + conf.getCredentials().addAll(credentials); appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { Modified: hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1488412&r1=1488411&r2=1488412&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original) +++ hadoop/common/branches/branch-2.0.5-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Fri May 31 22:14:01 2013 @@ -21,28 +21,51 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import junit.framework.Assert; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; +import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -50,13 +73,20 @@ import org.junit.Test; public class TestMRAppMaster { private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class); static String stagingDir = "staging/"; + private static FileContext localFS = null; + private static final File testDir = new File("target", + TestMRAppMaster.class.getName() + "-tmpDir").getAbsoluteFile(); @BeforeClass - public static void setup() { + public static void setup() throws AccessControlException, + FileNotFoundException, IllegalArgumentException, IOException { //Do not error out if metrics are inited multiple times DefaultMetricsSystem.setMiniClusterMode(true); File dir = new File(stagingDir); stagingDir = dir.getAbsolutePath(); + localFS = FileContext.getLocalFSFileContext(); + localFS.delete(new Path(testDir.getAbsolutePath()), true); + testDir.mkdir(); } @Before @@ -81,7 +111,7 @@ public class TestMRAppMaster { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, System.currentTimeMillis()); - YarnConfiguration conf = new YarnConfiguration(); + JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR @@ -94,7 +124,7 @@ public class TestMRAppMaster { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; - YarnConfiguration conf = new YarnConfiguration(); + JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); @@ -107,7 +137,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -128,7 +158,7 @@ public class TestMRAppMaster { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; - YarnConfiguration conf = new YarnConfiguration(); + JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); @@ -142,7 +172,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -163,7 +193,7 @@ public class TestMRAppMaster { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; - YarnConfiguration conf = new YarnConfiguration(); + JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); @@ -177,7 +207,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -198,7 +228,7 @@ public class TestMRAppMaster { String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; String userName = "TestAppMasterUser"; - YarnConfiguration conf = new YarnConfiguration(); + JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); ApplicationAttemptId applicationAttemptId = ConverterUtils .toApplicationAttemptId(applicationAttemptIdStr); @@ -212,7 +242,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -228,39 +258,155 @@ public class TestMRAppMaster { assertEquals(JobStateInternal.ERROR, appMaster.forcedState); appMaster.stop(); } + + // A dirty hack to modify the env of the current JVM itself - Dirty, but + // should be okay for testing. + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static void setNewEnvironmentHack(Map<String, String> newenv) + throws Exception { + try { + Class<?> cl = Class.forName("java.lang.ProcessEnvironment"); + Field field = cl.getDeclaredField("theEnvironment"); + field.setAccessible(true); + Map<String, String> env = (Map<String, String>) field.get(null); + env.clear(); + env.putAll(newenv); + Field ciField = cl.getDeclaredField("theCaseInsensitiveEnvironment"); + ciField.setAccessible(true); + Map<String, String> cienv = (Map<String, String>) ciField.get(null); + cienv.clear(); + cienv.putAll(newenv); + } catch (NoSuchFieldException e) { + Class[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newenv); + } + } + } + } + + @Test + public void testMRAppMasterCredentials() throws Exception { + + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + + // Simulate credentials passed to AM via client->RM->NM + Credentials credentials = new Credentials(); + byte[] identifier = "MyIdentifier".getBytes(); + byte[] password = "MyPassword".getBytes(); + Text kind = new Text("MyTokenKind"); + Text service = new Text("host:port"); + Token<? extends TokenIdentifier> myToken = + new Token<TokenIdentifier>(identifier, password, kind, service); + Text tokenAlias = new Text("myToken"); + credentials.addToken(tokenAlias, myToken); + Text keyAlias = new Text("mySecretKeyAlias"); + credentials.addSecretKey(keyAlias, "mySecretKey".getBytes()); + Token<? extends TokenIdentifier> storedToken = + credentials.getToken(tokenAlias); + + JobConf conf = new JobConf(); + + Path tokenFilePath = new Path(testDir.getAbsolutePath(), "tokens-file"); + Map<String, String> newEnv = new HashMap<String, String>(); + newEnv.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, tokenFilePath + .toUri().getPath()); + setNewEnvironmentHack(newEnv); + credentials.writeTokenStorageFile(tokenFilePath, conf); + + ApplicationId appId = BuilderUtils.newApplicationId(12345, 56); + ApplicationAttemptId applicationAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + ContainerId containerId = + BuilderUtils.newContainerId(applicationAttemptId, 546); + String userName = UserGroupInformation.getCurrentUser().getShortUserName(); + + // Create staging dir, so MRAppMaster doesn't barf. + File stagingDir = + new File(MRApps.getStagingAreaDir(conf, userName).toString()); + stagingDir.mkdirs(); + + // Set login-user to null as that is how real world MRApp starts with. + // This is null is the reason why token-file is read by UGI. + UserGroupInformation.setLoginUser(null); + + MRAppMasterTest appMaster = + new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, + System.currentTimeMillis(), false, true); + MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); + + // Now validate the credentials + Credentials appMasterCreds = conf.getCredentials(); + Assert.assertNotNull(appMasterCreds); + Assert.assertEquals(1, appMasterCreds.numberOfSecretKeys()); + Assert.assertEquals(1, appMasterCreds.numberOfTokens()); + + // Validate the tokens + Token<? extends TokenIdentifier> usedToken = + appMasterCreds.getToken(tokenAlias); + Assert.assertNotNull(usedToken); + Assert.assertEquals(storedToken, usedToken); + + // Validate the keys + byte[] usedKey = appMasterCreds.getSecretKey(keyAlias); + Assert.assertNotNull(usedKey); + Assert.assertEquals("mySecretKey", new String(usedKey)); + + // The credentials should also be added to conf so that OuputCommitter can + // access it + Credentials confCredentials = conf.getCredentials(); + Assert.assertEquals(1, confCredentials.numberOfSecretKeys()); + Assert.assertEquals(1, confCredentials.numberOfTokens()); + Assert.assertEquals(storedToken, confCredentials.getToken(tokenAlias)); + Assert.assertEquals("mySecretKey", + new String(confCredentials.getSecretKey(keyAlias))); + } } class MRAppMasterTest extends MRAppMaster { Path stagingDirPath; private Configuration conf; - private boolean overrideInitAndStart; + private boolean overrideInit; + private boolean overrideStart; ContainerAllocator mockContainerAllocator; + CommitterEventHandler mockCommitterEventHandler; + RMHeartbeatHandler mockRMHeartbeatHandler; public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, long submitTime) { - this(applicationAttemptId, containerId, host, port, httpPort, submitTime, - true); + this(applicationAttemptId, containerId, host, port, httpPort, + submitTime, true, true); } public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, boolean overrideInitAndStart) { + long submitTime, boolean overrideInit, boolean overrideStart) { super(applicationAttemptId, containerId, host, port, httpPort, submitTime); - this.overrideInitAndStart = overrideInitAndStart; + this.overrideInit = overrideInit; + this.overrideStart = overrideStart; mockContainerAllocator = mock(ContainerAllocator.class); + mockCommitterEventHandler = mock(CommitterEventHandler.class); + mockRMHeartbeatHandler = mock(RMHeartbeatHandler.class); } @Override public void init(Configuration conf) { - if (overrideInitAndStart) { - this.conf = conf; - } else { + if (!overrideInit) { super.init(conf); } + this.conf = conf; } - - @Override + + @Override protected void downloadTokensAndSetupUGI(Configuration conf) { try { this.currentUser = UserGroupInformation.getCurrentUser(); @@ -268,7 +414,7 @@ class MRAppMasterTest extends MRAppMaste throw new YarnException(e); } } - + @Override protected ContainerAllocator createContainerAllocator( final ClientService clientService, final AppContext context) { @@ -276,8 +422,19 @@ class MRAppMasterTest extends MRAppMaste } @Override + protected EventHandler<CommitterEvent> createCommitterEventHandler( + AppContext context, OutputCommitter committer) { + return mockCommitterEventHandler; + } + + @Override + protected RMHeartbeatHandler getRMHeartbeatHandler() { + return mockRMHeartbeatHandler; + } + + @Override public void start() { - if (overrideInitAndStart) { + if (overrideStart) { try { String user = UserGroupInformation.getCurrentUser().getShortUserName(); stagingDirPath = MRApps.getStagingAreaDir(conf, user);