Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri Jul 25 20:33:09 2014 @@ -169,6 +169,15 @@ public class NodeManager extends Composi } } + private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager, + NMContainerTokenSecretManager containerTokenSecretManager) + throws IOException { + if (nmStore.canRecover()) { + nmTokenSecretManager.recover(); + containerTokenSecretManager.recover(); + } + } + @Override protected void serviceInit(Configuration conf) throws Exception { @@ -181,10 +190,12 @@ public class NodeManager extends Composi initAndStartRecoveryStore(conf); NMContainerTokenSecretManager containerTokenSecretManager = - new NMContainerTokenSecretManager(conf); + new NMContainerTokenSecretManager(conf, nmStore); NMTokenSecretManagerInNM nmTokenSecretManager = - new NMTokenSecretManagerInNM(); + new NMTokenSecretManagerInNM(nmStore); + + recoverTokens(nmTokenSecretManager, containerTokenSecretManager); this.aclsManager = new ApplicationACLsManager(conf);
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java Fri Jul 25 20:33:09 2014 @@ -35,11 +35,19 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; @@ -50,14 +58,18 @@ import org.iq80.leveldb.Logger; import org.iq80.leveldb.Options; import org.iq80.leveldb.WriteBatch; +import com.google.common.annotations.VisibleForTesting; + public class NMLeveldbStateStoreService extends NMStateStoreService { public static final Log LOG = LogFactory.getLog(NMLeveldbStateStoreService.class); private static final String DB_NAME = "yarn-nm-state"; - private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; - private static final String DB_SCHEMA_VERSION = "1.0"; + private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; + + private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion + .newInstance(1, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -72,6 +84,20 @@ public class NMLeveldbStateStoreService private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/"; private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/"; + private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; + private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; + private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/"; + private static final String NM_TOKENS_CURRENT_MASTER_KEY = + NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX; + private static final String NM_TOKENS_PREV_MASTER_KEY = + NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private static final String CONTAINER_TOKENS_KEY_PREFIX = + "ContainerTokens/"; + private static final String CONTAINER_TOKENS_CURRENT_MASTER_KEY = + CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX; + private static final String CONTAINER_TOKENS_PREV_MASTER_KEY = + CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private DB db; public NMLeveldbStateStoreService() { @@ -122,7 +148,7 @@ public class NMLeveldbStateStoreService key.substring(0, userEndPos+1))); } } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } finally { if (iter != null) { iter.close(); @@ -241,7 +267,7 @@ public class NMLeveldbStateStoreService try { db.put(bytes(key), proto.toByteArray()); } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } } @@ -264,7 +290,7 @@ public class NMLeveldbStateStoreService batch.close(); } } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } } @@ -287,7 +313,7 @@ public class NMLeveldbStateStoreService batch.close(); } } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } } @@ -336,7 +362,7 @@ public class NMLeveldbStateStoreService DeletionServiceDeleteTaskProto.parseFrom(entry.getValue())); } } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } finally { if (iter != null) { iter.close(); @@ -352,7 +378,7 @@ public class NMLeveldbStateStoreService try { db.put(bytes(key), taskProto.toByteArray()); } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); } } @@ -362,7 +388,178 @@ public class NMLeveldbStateStoreService try { db.delete(bytes(key)); } catch (DBException e) { - throw new IOException(e.getMessage(), e); + throw new IOException(e); + } + } + + + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + RecoveredNMTokensState state = new RecoveredNMTokensState(); + state.applicationMasterKeys = + new HashMap<ApplicationAttemptId, MasterKey>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(NM_TOKENS_KEY_PREFIX)); + while (iter.hasNext()) { + Entry<byte[], byte[]> entry = iter.next(); + String fullKey = asString(entry.getKey()); + if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) { + break; + } + String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length()); + if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { + state.currentMasterKey = parseMasterKey(entry.getValue()); + } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { + state.previousMasterKey = parseMasterKey(entry.getValue()); + } else if (key.startsWith( + ApplicationAttemptId.appAttemptIdStrPrefix)) { + ApplicationAttemptId attempt; + try { + attempt = ConverterUtils.toApplicationAttemptId(key); + } catch (IllegalArgumentException e) { + throw new IOException("Bad application master key state for " + + fullKey, e); + } + state.applicationMasterKeys.put(attempt, + parseMasterKey(entry.getValue())); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + @Override + public void storeNMTokenCurrentMasterKey(MasterKey key) + throws IOException { + storeMasterKey(NM_TOKENS_CURRENT_MASTER_KEY, key); + } + + @Override + public void storeNMTokenPreviousMasterKey(MasterKey key) + throws IOException { + storeMasterKey(NM_TOKENS_PREV_MASTER_KEY, key); + } + + @Override + public void storeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt, MasterKey key) throws IOException { + storeMasterKey(NM_TOKENS_KEY_PREFIX + attempt, key); + } + + @Override + public void removeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt) throws IOException { + String key = NM_TOKENS_KEY_PREFIX + attempt; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + + private MasterKey parseMasterKey(byte[] keyData) throws IOException { + return new MasterKeyPBImpl(MasterKeyProto.parseFrom(keyData)); + } + + private void storeMasterKey(String dbKey, MasterKey key) + throws IOException { + MasterKeyPBImpl pb = (MasterKeyPBImpl) key; + try { + db.put(bytes(dbKey), pb.getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + RecoveredContainerTokensState state = new RecoveredContainerTokensState(); + state.activeTokens = new HashMap<ContainerId, Long>(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX)); + final int containerTokensKeyPrefixLength = + CONTAINER_TOKENS_KEY_PREFIX.length(); + while (iter.hasNext()) { + Entry<byte[], byte[]> entry = iter.next(); + String fullKey = asString(entry.getKey()); + if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) { + break; + } + String key = fullKey.substring(containerTokensKeyPrefixLength); + if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) { + state.currentMasterKey = parseMasterKey(entry.getValue()); + } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) { + state.previousMasterKey = parseMasterKey(entry.getValue()); + } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) { + loadContainerToken(state, fullKey, key, entry.getValue()); + } + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + private static void loadContainerToken(RecoveredContainerTokensState state, + String key, String containerIdStr, byte[] value) throws IOException { + ContainerId containerId; + Long expTime; + try { + containerId = ConverterUtils.toContainerId(containerIdStr); + expTime = Long.parseLong(asString(value)); + } catch (IllegalArgumentException e) { + throw new IOException("Bad container token state for " + key, e); + } + state.activeTokens.put(containerId, expTime); + } + + @Override + public void storeContainerTokenCurrentMasterKey(MasterKey key) + throws IOException { + storeMasterKey(CONTAINER_TOKENS_CURRENT_MASTER_KEY, key); + } + + @Override + public void storeContainerTokenPreviousMasterKey(MasterKey key) + throws IOException { + storeMasterKey(CONTAINER_TOKENS_PREV_MASTER_KEY, key); + } + + @Override + public void storeContainerToken(ContainerId containerId, Long expTime) + throws IOException { + String key = CONTAINER_TOKENS_KEY_PREFIX + containerId; + try { + db.put(bytes(key), bytes(expTime.toString())); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeContainerToken(ContainerId containerId) + throws IOException { + String key = CONTAINER_TOKENS_KEY_PREFIX + containerId; + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); } } @@ -376,22 +573,16 @@ public class NMLeveldbStateStoreService options.logger(new LeveldbLogger()); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); - byte[] schemaVersionData = null; try { db = JniDBFactory.factory.open(dbfile, options); - try { - schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY)); - } catch (DBException e) { - throw new IOException(e.getMessage(), e); - } } catch (NativeDB.DBException e) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { LOG.info("Creating state database at " + dbfile); options.createIfMissing(true); try { db = JniDBFactory.factory.open(dbfile, options); - schemaVersionData = bytes(DB_SCHEMA_VERSION); - db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData); + // store version + storeVersion(); } catch (DBException dbErr) { throw new IOException(dbErr.getMessage(), dbErr); } @@ -399,16 +590,7 @@ public class NMLeveldbStateStoreService throw e; } } - if (schemaVersionData != null) { - String schemaVersion = asString(schemaVersionData); - // only support exact schema matches for now - if (!DB_SCHEMA_VERSION.equals(schemaVersion)) { - throw new IOException("Incompatible state database schema, found " - + schemaVersion + " expected " + DB_SCHEMA_VERSION); - } - } else { - throw new IOException("State database schema version not found"); - } + checkVersion(); } private Path createStorageDir(Configuration conf) throws IOException { @@ -433,4 +615,68 @@ public class NMLeveldbStateStoreService LOG.info(message); } } + + + NMDBSchemaVersion loadVersion() throws IOException { + byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return NMDBSchemaVersion.newInstance(1, 0); + } + NMDBSchemaVersion version = + new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data)); + return version; + } + + private void storeVersion() throws IOException { + dbStoreVersion(CURRENT_VERSION_INFO); + } + + // Only used for test + @VisibleForTesting + void storeVersion(NMDBSchemaVersion state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(NMDBSchemaVersion state) throws IOException { + String key = DB_SCHEMA_VERSION_KEY; + byte[] data = + ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray(); + try { + db.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + NMDBSchemaVersion getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of state-store is a major upgrade, and any + * compatible change of state-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade NM state or remove incompatible old state. + */ + private void checkVersion() throws IOException { + NMDBSchemaVersion loadedVersion = loadVersion(); + LOG.info("Loaded NM state version info " + loadedVersion); + if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing NM state version info " + getCurrentVersion()); + storeVersion(); + } else { + throw new IOException( + "Incompatible version for NM state: expecting NM state version " + + getCurrentVersion() + ", but loading version " + loadedVersion); + } + } + } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java Fri Jul 25 20:33:09 2014 @@ -22,10 +22,13 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.api.records.MasterKey; // The state store to use when state isn't being stored public class NMNullStateStoreService extends NMStateStoreService { @@ -78,6 +81,59 @@ public class NMNullStateStoreService ext } @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeNMTokenCurrentMasterKey(MasterKey key) + throws IOException { + } + + @Override + public void storeNMTokenPreviousMasterKey(MasterKey key) + throws IOException { + } + + @Override + public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt, + MasterKey key) throws IOException { + } + + @Override + public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt) + throws IOException { + } + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeContainerTokenCurrentMasterKey(MasterKey key) + throws IOException { + } + + @Override + public void storeContainerTokenPreviousMasterKey(MasterKey key) + throws IOException { + } + + @Override + public void storeContainerToken(ContainerId containerId, + Long expirationTime) throws IOException { + } + + @Override + public void removeContainerToken(ContainerId containerId) + throws IOException { + } + + @Override protected void initStorage(Configuration conf) throws IOException { } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java Fri Jul 25 20:33:09 2014 @@ -29,10 +29,13 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.api.records.MasterKey; @Private @Unstable @@ -100,6 +103,42 @@ public abstract class NMStateStoreServic } } + public static class RecoveredNMTokensState { + MasterKey currentMasterKey; + MasterKey previousMasterKey; + Map<ApplicationAttemptId, MasterKey> applicationMasterKeys; + + public MasterKey getCurrentMasterKey() { + return currentMasterKey; + } + + public MasterKey getPreviousMasterKey() { + return previousMasterKey; + } + + public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() { + return applicationMasterKeys; + } + } + + public static class RecoveredContainerTokensState { + MasterKey currentMasterKey; + MasterKey previousMasterKey; + Map<ContainerId, Long> activeTokens; + + public MasterKey getCurrentMasterKey() { + return currentMasterKey; + } + + public MasterKey getPreviousMasterKey() { + return previousMasterKey; + } + + public Map<ContainerId, Long> getActiveTokens() { + return activeTokens; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -173,6 +212,38 @@ public abstract class NMStateStoreServic public abstract void removeDeletionTask(int taskId) throws IOException; + public abstract RecoveredNMTokensState loadNMTokensState() + throws IOException; + + public abstract void storeNMTokenCurrentMasterKey(MasterKey key) + throws IOException; + + public abstract void storeNMTokenPreviousMasterKey(MasterKey key) + throws IOException; + + public abstract void storeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt, MasterKey key) throws IOException; + + public abstract void removeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt) throws IOException; + + + public abstract RecoveredContainerTokensState loadContainerTokensState() + throws IOException; + + public abstract void storeContainerTokenCurrentMasterKey(MasterKey key) + throws IOException; + + public abstract void storeContainerTokenPreviousMasterKey(MasterKey key) + throws IOException; + + public abstract void storeContainerToken(ContainerId containerId, + Long expirationTime) throws IOException; + + public abstract void removeContainerToken(ContainerId containerId) + throws IOException; + + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Fri Jul 25 20:33:09 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.security; +import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -49,14 +53,74 @@ public class NMContainerTokenSecretManag private MasterKeyData previousMasterKey; private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker; - + private final NMStateStoreService stateStore; private String nodeHostAddr; public NMContainerTokenSecretManager(Configuration conf) { + this(conf, new NMNullStateStoreService()); + } + + public NMContainerTokenSecretManager(Configuration conf, + NMStateStoreService stateStore) { super(conf); recentlyStartedContainerTracker = new TreeMap<Long, List<ContainerId>>(); + this.stateStore = stateStore; + } + + public synchronized void recover() + throws IOException { + RecoveredContainerTokensState state = + stateStore.loadContainerTokensState(); + MasterKey key = state.getCurrentMasterKey(); + if (key != null) { + super.currentMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + key = state.getPreviousMasterKey(); + if (key != null) { + previousMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + // restore the serial number from the current master key + if (super.currentMasterKey != null) { + super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; + } + + for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) { + ContainerId containerId = entry.getKey(); + Long expTime = entry.getValue(); + List<ContainerId> containerList = + recentlyStartedContainerTracker.get(expTime); + if (containerList == null) { + containerList = new ArrayList<ContainerId>(); + recentlyStartedContainerTracker.put(expTime, containerList); + } + if (!containerList.contains(containerId)) { + containerList.add(containerId); + } + } + } + + private void updateCurrentMasterKey(MasterKeyData key) { + super.currentMasterKey = key; + try { + stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } + + private void updatePreviousMasterKey(MasterKeyData key) { + previousMasterKey = key; + try { + stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update previous master key in state store", e); + } } /** @@ -68,21 +132,16 @@ public class NMContainerTokenSecretManag */ @Private public synchronized void setMasterKey(MasterKey masterKeyRecord) { - LOG.info("Rolling master-key for container-tokens, got key with id " - + masterKeyRecord.getKeyId()); - if (super.currentMasterKey == null) { - super.currentMasterKey = - new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord - .getBytes().array())); - } else { - if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord - .getKeyId()) { - // Update keys only if the key has changed. - this.previousMasterKey = super.currentMasterKey; - super.currentMasterKey = - new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord - .getBytes().array())); + // Update keys only if the key has changed. + if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey() + .getKeyId() != masterKeyRecord.getKeyId()) { + LOG.info("Rolling master-key for container-tokens, got key with id " + + masterKeyRecord.getKeyId()); + if (super.currentMasterKey != null) { + updatePreviousMasterKey(super.currentMasterKey); } + updateCurrentMasterKey(new MasterKeyData(masterKeyRecord, + createSecretKey(masterKeyRecord.getBytes().array()))); } } @@ -137,14 +196,19 @@ public class NMContainerTokenSecretManag removeAnyContainerTokenIfExpired(); + ContainerId containerId = tokenId.getContainerID(); Long expTime = tokenId.getExpiryTimeStamp(); // We might have multiple containers with same expiration time. if (!recentlyStartedContainerTracker.containsKey(expTime)) { recentlyStartedContainerTracker .put(expTime, new ArrayList<ContainerId>()); } - recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID()); - + recentlyStartedContainerTracker.get(expTime).add(containerId); + try { + stateStore.storeContainerToken(containerId, expTime); + } catch (IOException e) { + LOG.error("Unable to store token for container " + containerId, e); + } } protected synchronized void removeAnyContainerTokenIfExpired() { @@ -155,6 +219,13 @@ public class NMContainerTokenSecretManag while (containersI.hasNext()) { Entry<Long, List<ContainerId>> containerEntry = containersI.next(); if (containerEntry.getKey() < currTime) { + for (ContainerId container : containerEntry.getValue()) { + try { + stateStore.removeContainerToken(container); + } catch (IOException e) { + LOG.error("Unable to remove token for container " + container, e); + } + } containersI.remove(); } else { break; Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java Fri Jul 25 20:33:09 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.security; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.MasterKeyData; @@ -45,16 +49,79 @@ public class NMTokenSecretManagerInNM ex private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys; private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap; + private final NMStateStoreService stateStore; private NodeId nodeId; - public NMTokenSecretManagerInNM() { + this(new NMNullStateStoreService()); + } + + public NMTokenSecretManagerInNM(NMStateStoreService stateStore) { this.oldMasterKeys = new HashMap<ApplicationAttemptId, MasterKeyData>(); appToAppAttemptMap = new HashMap<ApplicationId, List<ApplicationAttemptId>>(); + this.stateStore = stateStore; } + public synchronized void recover() + throws IOException { + RecoveredNMTokensState state = stateStore.loadNMTokensState(); + MasterKey key = state.getCurrentMasterKey(); + if (key != null) { + super.currentMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + key = state.getPreviousMasterKey(); + if (key != null) { + previousMasterKey = + new MasterKeyData(key, createSecretKey(key.getBytes().array())); + } + + // restore the serial number from the current master key + if (super.currentMasterKey != null) { + super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1; + } + + for (Map.Entry<ApplicationAttemptId, MasterKey> entry : + state.getApplicationMasterKeys().entrySet()) { + key = entry.getValue(); + oldMasterKeys.put(entry.getKey(), + new MasterKeyData(key, createSecretKey(key.getBytes().array()))); + } + + // reconstruct app to app attempts map + appToAppAttemptMap.clear(); + for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) { + ApplicationId app = attempt.getApplicationId(); + List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app); + if (attempts == null) { + attempts = new ArrayList<ApplicationAttemptId>(); + appToAppAttemptMap.put(app, attempts); + } + attempts.add(attempt); + } + } + + private void updateCurrentMasterKey(MasterKeyData key) { + super.currentMasterKey = key; + try { + stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update current master key in state store", e); + } + } + + private void updatePreviousMasterKey(MasterKeyData key) { + previousMasterKey = key; + try { + stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to update previous master key in state store", e); + } + } + /** * Used by NodeManagers to create a token-secret-manager with the key * obtained from the RM. This can happen during registration or when the RM @@ -62,20 +129,16 @@ public class NMTokenSecretManagerInNM ex */ @Private public synchronized void setMasterKey(MasterKey masterKey) { - LOG.info("Rolling master-key for nm-tokens, got key with id :" - + masterKey.getKeyId()); - if (super.currentMasterKey == null) { - super.currentMasterKey = - new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes() - .array())); - } else { - if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey - .getKeyId()) { - this.previousMasterKey = super.currentMasterKey; - super.currentMasterKey = - new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes() - .array())); + // Update keys only if the key has changed. + if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey() + .getKeyId() != masterKey.getKeyId()) { + LOG.info("Rolling master-key for container-tokens, got key with id " + + masterKey.getKeyId()); + if (super.currentMasterKey != null) { + updatePreviousMasterKey(super.currentMasterKey); } + updateCurrentMasterKey(new MasterKeyData(masterKey, + createSecretKey(masterKey.getBytes().array()))); } } @@ -128,7 +191,7 @@ public class NMTokenSecretManagerInNM ex LOG.debug("Removing application attempts NMToken keys for application " + appId); for (ApplicationAttemptId appAttemptId : appAttemptList) { - this.oldMasterKeys.remove(appAttemptId); + removeAppAttemptKey(appAttemptId); } appToAppAttemptMap.remove(appId); } else { @@ -164,11 +227,11 @@ public class NMTokenSecretManagerInNM ex + identifier.getApplicationAttemptId().toString()); if (identifier.getKeyId() == currentMasterKey.getMasterKey() .getKeyId()) { - oldMasterKeys.put(appAttemptId, currentMasterKey); + updateAppAttemptKey(appAttemptId, currentMasterKey); } else if (previousMasterKey != null && identifier.getKeyId() == previousMasterKey.getMasterKey() .getKeyId()) { - oldMasterKeys.put(appAttemptId, previousMasterKey); + updateAppAttemptKey(appAttemptId, previousMasterKey); } else { throw new InvalidToken( "Older NMToken should not be used while starting the container."); @@ -193,4 +256,24 @@ public class NMTokenSecretManagerInNM ex public synchronized NodeId getNodeId() { return this.nodeId; } + + private void updateAppAttemptKey(ApplicationAttemptId attempt, + MasterKeyData key) { + this.oldMasterKeys.put(attempt, key); + try { + stateStore.storeNMTokenApplicationMasterKey(attempt, + key.getMasterKey()); + } catch (IOException e) { + LOG.error("Unable to store master key for application " + attempt, e); + } + } + + private void removeAppAttemptKey(ApplicationAttemptId attempt) { + this.oldMasterKeys.remove(attempt); + try { + stateStore.removeNMTokenApplicationMasterKey(attempt); + } catch (IOException e) { + LOG.error("Unable to remove master key for application " + attempt, e); + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java Fri Jul 25 20:33:09 2014 @@ -72,7 +72,7 @@ public class NodePage extends NMView { ._("Total Pmem allocated for Container", StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB)) ._("Pmem enforcement enabled", - info.isVmemCheckEnabled()) + info.isPmemCheckEnabled()) ._("Total VCores allocated for Containers", String.valueOf(info.getTotalVCoresAllocated())) ._("NodeHealthyStatus", Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto Fri Jul 25 20:33:09 2014 @@ -38,3 +38,9 @@ message LocalizedResourceProto { optional string localPath = 2; optional int64 size = 3; } + +message NMDBSchemaVersionProto { + optional int32 majorVersion = 1; + optional int32 minorVersion = 2; +} + Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java Fri Jul 25 20:33:09 2014 @@ -18,16 +18,37 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.junit.Assert.assertTrue; + +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.InputStream; import java.io.IOException; +import java.io.LineNumberReader; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Random; -import org.junit.Assert; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; @@ -45,15 +66,13 @@ import org.apache.hadoop.util.Progressab import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream; -import static org.apache.hadoop.fs.CreateFlag.*; - - import org.junit.AfterClass; +import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; -import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; -import static org.mockito.Mockito.*; +import org.junit.After; +import org.junit.Assert; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestDefaultContainerExecutor { @@ -191,6 +210,92 @@ public class TestDefaultContainerExecuto } } + @Test + public void testContainerLaunchError() + throws IOException, InterruptedException { + + Path localDir = new Path(BASE_TMP_PATH, "localDir"); + List<String> localDirs = new ArrayList<String>(); + localDirs.add(localDir.toString()); + List<String> logDirs = new ArrayList<String>(); + Path logDir = new Path(BASE_TMP_PATH, "logDir"); + logDirs.add(logDir.toString()); + + Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString()); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString()); + + FileContext lfs = FileContext.getLocalFSFileContext(conf); + DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs)); + mockExec.setConf(conf); + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + String diagnostics = (String) invocationOnMock.getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + diagnostics, + diagnostics.contains("No such file or directory")); + return null; + } + } + ).when(mockExec).logOutput(any(String.class)); + + String appSubmitter = "nobody"; + String appId = "APP_ID"; + String containerId = "CONTAINER_ID"; + Container container = mock(Container.class); + ContainerId cId = mock(ContainerId.class); + ContainerLaunchContext context = mock(ContainerLaunchContext.class); + HashMap<String, String> env = new HashMap<String, String>(); + + when(container.getContainerId()).thenReturn(cId); + when(container.getLaunchContext()).thenReturn(context); + try { + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + ContainerDiagnosticsUpdateEvent event = + (ContainerDiagnosticsUpdateEvent) invocationOnMock + .getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + + event.getDiagnosticsUpdate(), + event.getDiagnosticsUpdate().contains("No such file or directory") + ); + return null; + } + }).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class)); + + when(cId.toString()).thenReturn(containerId); + when(cId.getApplicationAttemptId()).thenReturn( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 0)); + + when(context.getEnvironment()).thenReturn(env); + + mockExec.createUserLocalDirs(localDirs, appSubmitter); + mockExec.createUserCacheDirs(localDirs, appSubmitter); + mockExec.createAppDirs(localDirs, appSubmitter, appId); + mockExec.createAppLogDirs(appId, logDirs); + + Path scriptPath = new Path("file:///bin/echo"); + Path tokensPath = new Path("file:///dev/null"); + Path workDir = localDir; + Path pidFile = new Path(workDir, "pid.txt"); + + mockExec.init(); + mockExec.activateContainer(cId, pidFile); + int ret = mockExec + .launchContainer(container, scriptPath, tokensPath, appSubmitter, + appId, workDir, localDirs, localDirs); + Assert.assertNotSame(0, ret); + } finally { + mockExec.deleteAsUser(appSubmitter, localDir); + mockExec.deleteAsUser(appSubmitter, logDir); + } + } + // @Test // public void testInit() throws IOException, InterruptedException { // Configuration conf = new Configuration(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java Fri Jul 25 20:33:09 2014 @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.io.File; @@ -34,8 +38,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -46,9 +48,13 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent; +import org.junit.Assert; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestLinuxContainerExecutorWithMocks { @@ -216,7 +222,19 @@ public class TestLinuxContainerExecutorW conf.set(YarnConfiguration.NM_LOCAL_DIRS, "file:///bin/echo"); conf.set(YarnConfiguration.NM_LOG_DIRS, "file:///dev/null"); - mockExec = new LinuxContainerExecutor(); + mockExec = spy(new LinuxContainerExecutor()); + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + String diagnostics = (String) invocationOnMock.getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + diagnostics, + diagnostics.contains("badcommand")); + return null; + } + } + ).when(mockExec).logOutput(any(String.class)); dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); mockExec.setConf(conf); @@ -233,7 +251,22 @@ public class TestLinuxContainerExecutorW when(container.getContainerId()).thenReturn(cId); when(container.getLaunchContext()).thenReturn(context); - + doAnswer( + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) + throws Throwable { + ContainerDiagnosticsUpdateEvent event = + (ContainerDiagnosticsUpdateEvent) invocationOnMock + .getArguments()[0]; + assertTrue("Invalid Diagnostics message: " + + event.getDiagnosticsUpdate(), + event.getDiagnosticsUpdate().contains("badcommand")); + return null; + } + } + ).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class)); + when(cId.toString()).thenReturn(containerId); when(context.getEnvironment()).thenReturn(env); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java Fri Jul 25 20:33:09 2014 @@ -25,14 +25,20 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; public class NMMemoryStateStoreService extends NMStateStoreService { private Map<TrackerKey, TrackerState> trackerStates; private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; + private RecoveredNMTokensState nmTokenState; + private RecoveredContainerTokensState containerTokenState; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -113,6 +119,11 @@ public class NMMemoryStateStoreService e @Override protected void initStorage(Configuration conf) { + nmTokenState = new RecoveredNMTokensState(); + nmTokenState.applicationMasterKeys = + new HashMap<ApplicationAttemptId, MasterKey>(); + containerTokenState = new RecoveredContainerTokensState(); + containerTokenState.activeTokens = new HashMap<ContainerId, Long>(); trackerStates = new HashMap<TrackerKey, TrackerState>(); deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>(); } @@ -148,6 +159,89 @@ public class NMMemoryStateStoreService e } + @Override + public RecoveredNMTokensState loadNMTokensState() throws IOException { + // return a copy so caller can't modify our state + RecoveredNMTokensState result = new RecoveredNMTokensState(); + result.currentMasterKey = nmTokenState.currentMasterKey; + result.previousMasterKey = nmTokenState.previousMasterKey; + result.applicationMasterKeys = + new HashMap<ApplicationAttemptId, MasterKey>( + nmTokenState.applicationMasterKeys); + return result; + } + + @Override + public void storeNMTokenCurrentMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public void storeNMTokenPreviousMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt, + MasterKey key) throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + nmTokenState.applicationMasterKeys.put(attempt, + new MasterKeyPBImpl(keypb.getProto())); + } + + @Override + public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt) + throws IOException { + nmTokenState.applicationMasterKeys.remove(attempt); + } + + + @Override + public RecoveredContainerTokensState loadContainerTokensState() + throws IOException { + // return a copy so caller can't modify our state + RecoveredContainerTokensState result = + new RecoveredContainerTokensState(); + result.currentMasterKey = containerTokenState.currentMasterKey; + result.previousMasterKey = containerTokenState.previousMasterKey; + result.activeTokens = + new HashMap<ContainerId, Long>(containerTokenState.activeTokens); + return result; + } + + @Override + public void storeContainerTokenCurrentMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + containerTokenState.currentMasterKey = + new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public void storeContainerTokenPreviousMasterKey(MasterKey key) + throws IOException { + MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; + containerTokenState.previousMasterKey = + new MasterKeyPBImpl(keypb.getProto()); + } + + @Override + public void storeContainerToken(ContainerId containerId, + Long expirationTime) throws IOException { + containerTokenState.activeTokens.put(containerId, expirationTime); + } + + @Override + public void removeContainerToken(ContainerId containerId) + throws IOException { + containerTokenState.activeTokens.remove(containerId); + } + + private static class TrackerState { Map<Path, LocalResourceProto> inProgressMap = new HashMap<Path, LocalResourceProto>(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java Fri Jul 25 20:33:09 2014 @@ -20,15 +20,20 @@ package org.apache.hadoop.yarn.server.no import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -37,12 +42,20 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; +import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -97,6 +110,36 @@ public class TestNMLeveldbStateStoreServ assertTrue(stateStore.canRecover()); verifyEmptyState(); } + + @Test + public void testCheckVersion() throws IOException { + // default version + NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, stateStore.loadVersion()); + + // compatible version + NMDBSchemaVersion compatibleVersion = + NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + stateStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, stateStore.loadVersion()); + restartStateStore(); + // overwrite the compatible version + Assert.assertEquals(defaultVersion, stateStore.loadVersion()); + + // incompatible version + NMDBSchemaVersion incompatibleVersion = + NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + stateStore.storeVersion(incompatibleVersion); + try { + restartStateStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for NM state:")); + } + } @Test public void testStartResourceLocalization() throws IOException { @@ -460,4 +503,159 @@ public class TestNMLeveldbStateStoreServ state = stateStore.loadDeletionServiceState(); assertTrue(state.getTasks().isEmpty()); } + + @Test + public void testNMTokenStorage() throws IOException { + // test empty when no state + RecoveredNMTokensState state = stateStore.loadNMTokensState(); + assertNull(state.getCurrentMasterKey()); + assertNull(state.getPreviousMasterKey()); + assertTrue(state.getApplicationMasterKeys().isEmpty()); + + // store a master key and verify recovered + NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest(); + MasterKey currentKey = secretMgr.generateKey(); + stateStore.storeNMTokenCurrentMasterKey(currentKey); + restartStateStore(); + state = stateStore.loadNMTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertNull(state.getPreviousMasterKey()); + assertTrue(state.getApplicationMasterKeys().isEmpty()); + + // store a previous key and verify recovered + MasterKey prevKey = secretMgr.generateKey(); + stateStore.storeNMTokenPreviousMasterKey(prevKey); + restartStateStore(); + state = stateStore.loadNMTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + assertTrue(state.getApplicationMasterKeys().isEmpty()); + + // store a few application keys and verify recovered + ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(1, 1), 1); + MasterKey attemptKey1 = secretMgr.generateKey(); + stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1); + ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(2, 3), 4); + MasterKey attemptKey2 = secretMgr.generateKey(); + stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); + restartStateStore(); + state = stateStore.loadNMTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + Map<ApplicationAttemptId, MasterKey> loadedAppKeys = + state.getApplicationMasterKeys(); + assertEquals(2, loadedAppKeys.size()); + assertEquals(attemptKey1, loadedAppKeys.get(attempt1)); + assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); + + // add/update/remove keys and verify recovered + ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(5, 6), 7); + MasterKey attemptKey3 = secretMgr.generateKey(); + stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3); + stateStore.removeNMTokenApplicationMasterKey(attempt1); + attemptKey2 = prevKey; + stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2); + prevKey = currentKey; + stateStore.storeNMTokenPreviousMasterKey(prevKey); + currentKey = secretMgr.generateKey(); + stateStore.storeNMTokenCurrentMasterKey(currentKey); + restartStateStore(); + state = stateStore.loadNMTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + loadedAppKeys = state.getApplicationMasterKeys(); + assertEquals(2, loadedAppKeys.size()); + assertNull(loadedAppKeys.get(attempt1)); + assertEquals(attemptKey2, loadedAppKeys.get(attempt2)); + assertEquals(attemptKey3, loadedAppKeys.get(attempt3)); + } + + @Test + public void testContainerTokenStorage() throws IOException { + // test empty when no state + RecoveredContainerTokensState state = + stateStore.loadContainerTokensState(); + assertNull(state.getCurrentMasterKey()); + assertNull(state.getPreviousMasterKey()); + assertTrue(state.getActiveTokens().isEmpty()); + + // store a master key and verify recovered + ContainerTokenKeyGeneratorForTest keygen = + new ContainerTokenKeyGeneratorForTest(new YarnConfiguration()); + MasterKey currentKey = keygen.generateKey(); + stateStore.storeContainerTokenCurrentMasterKey(currentKey); + restartStateStore(); + state = stateStore.loadContainerTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertNull(state.getPreviousMasterKey()); + assertTrue(state.getActiveTokens().isEmpty()); + + // store a previous key and verify recovered + MasterKey prevKey = keygen.generateKey(); + stateStore.storeContainerTokenPreviousMasterKey(prevKey); + restartStateStore(); + state = stateStore.loadContainerTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + assertTrue(state.getActiveTokens().isEmpty()); + + // store a few container tokens and verify recovered + ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1); + Long expTime1 = 1234567890L; + ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2); + Long expTime2 = 9876543210L; + stateStore.storeContainerToken(cid1, expTime1); + stateStore.storeContainerToken(cid2, expTime2); + restartStateStore(); + state = stateStore.loadContainerTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + Map<ContainerId, Long> loadedActiveTokens = + state.getActiveTokens(); + assertEquals(2, loadedActiveTokens.size()); + assertEquals(expTime1, loadedActiveTokens.get(cid1)); + assertEquals(expTime2, loadedActiveTokens.get(cid2)); + + // add/update/remove tokens and verify recovered + ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3); + Long expTime3 = 135798642L; + stateStore.storeContainerToken(cid3, expTime3); + stateStore.removeContainerToken(cid1); + expTime2 += 246897531L; + stateStore.storeContainerToken(cid2, expTime2); + prevKey = currentKey; + stateStore.storeContainerTokenPreviousMasterKey(prevKey); + currentKey = keygen.generateKey(); + stateStore.storeContainerTokenCurrentMasterKey(currentKey); + restartStateStore(); + state = stateStore.loadContainerTokensState(); + assertEquals(currentKey, state.getCurrentMasterKey()); + assertEquals(prevKey, state.getPreviousMasterKey()); + loadedActiveTokens = state.getActiveTokens(); + assertEquals(2, loadedActiveTokens.size()); + assertNull(loadedActiveTokens.get(cid1)); + assertEquals(expTime2, loadedActiveTokens.get(cid2)); + assertEquals(expTime3, loadedActiveTokens.get(cid3)); + } + + private static class NMTokenSecretManagerForTest extends + BaseNMTokenSecretManager { + public MasterKey generateKey() { + return createNewMasterKey().getMasterKey(); + } + } + + private static class ContainerTokenKeyGeneratorForTest extends + BaseContainerTokenSecretManager { + public ContainerTokenKeyGeneratorForTest(Configuration conf) { + super(conf); + } + + public MasterKey generateKey() { + return createNewMasterKey().getMasterKey(); + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Fri Jul 25 20:33:09 2014 @@ -194,6 +194,21 @@ <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <scope>test</scope> + <type>test-jar</type> + <version>${project.version}</version> + </dependency> + + <!-- 'mvn dependency:analyze' fails to detect use of this dependency --> + <dependency> <groupId>com.sun.jersey.jersey-test-framework</groupId> <artifactId>jersey-test-framework-grizzly2</artifactId> <scope>test</scope> Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jul 25 20:33:09 2014 @@ -1035,8 +1035,8 @@ public class ResourceManager extends Com StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); - // If -format, then delete RMStateStore; else startup normally - if (argv.length == 1 && argv[0].equals("-format")) { + // If -format-state-store, then delete RMStateStore; else startup normally + if (argv.length == 1 && argv[0].equals("-format-state-store")) { deleteRMStateStore(conf); } else { ResourceManager resourceManager = new ResourceManager(); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Jul 25 20:33:09 2014 @@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp, return app.recoveredFinalState; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); + // No existent attempts means the attempt associated with this app was not + // started or started but not yet saved. + if (app.attempts.isEmpty()) { + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user)); + return RMAppState.SUBMITTED; + } + + // Add application to scheduler synchronously to guarantee scheduler + // knows applications before AM or NM re-registers. + app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId, + app.submissionContext.getQueue(), app.user, true)); // recover attempts app.recoverAppAttempts(); @@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp, return RMAppState.ACCEPTED; } - // No existent attempts means the attempt associated with this app was not - // started or started but not yet saved. - if (app.attempts.isEmpty()) { - return RMAppState.SUBMITTED; - } - // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Jul 25 20:33:09 2014 @@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements } // create AMRMToken - AMRMTokenIdentifier id = - new AMRMTokenIdentifier(appAttempt.applicationAttemptId); appAttempt.amrmToken = - new Token<AMRMTokenIdentifier>(id, - appAttempt.rmContext.getAMRMTokenSecretManager()); + appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.applicationAttemptId); // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. @@ -926,8 +924,10 @@ public class RMAppAttemptImpl implements appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); - appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( - appAttempt.getAppAttemptId(), false, false)); + // Add attempt to scheduler synchronously to guarantee scheduler + // knows attempts before AM or NM re-registers. + appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false, true)); } /* @@ -1294,12 +1294,20 @@ public class RMAppAttemptImpl implements private String getAMContainerCrashedDiagnostics( RMAppAttemptContainerFinishedEvent finishEvent) { ContainerStatus status = finishEvent.getContainerStatus(); - String diagnostics = - "AM Container for " + finishEvent.getApplicationAttemptId() - + " exited with " + " exitCode: " + status.getExitStatus() + ". " - + "Check application tracking page: " + this.getTrackingUrl() - + " . Then, click on links to logs of each attempt for detailed output. "; - return diagnostics; + StringBuilder diagnosticsBuilder = new StringBuilder(); + diagnosticsBuilder.append("AM Container for ").append( + finishEvent.getApplicationAttemptId()).append( + " exited with ").append(" exitCode: ").append(status.getExitStatus()). + append("\n"); + if (this.getTrackingUrl() != null) { + diagnosticsBuilder.append("For more detailed output,").append( + " check application tracking page:").append( + this.getTrackingUrl()).append( + "Then, click on links to logs of each attempt.\n"); + } + diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics()) + .append("Failing this attempt"); + return diagnosticsBuilder.toString(); } private static class FinalTransition extends BaseFinalTransition { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Fri Jul 25 20:33:09 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; +import java.util.List; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; /** @@ -73,5 +76,7 @@ public interface RMContainer extends Eve ContainerReport createContainerReport(); boolean isAMContainer(); + + List<ResourceRequest> getResourceRequests(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Fri Jul 25 20:33:09 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; import java.util.EnumSet; +import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -158,6 +160,7 @@ public class RMContainerImpl implements private long finishTime; private ContainerStatus finishedStatus; private boolean isAMContainer; + private List<ResourceRequest> resourceRequests; public RMContainerImpl(Container container, ApplicationAttemptId appAttemptId, NodeId nodeId, String user, @@ -180,7 +183,8 @@ public class RMContainerImpl implements this.eventHandler = rmContext.getDispatcher().getEventHandler(); this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer(); this.isAMContainer = false; - + this.resourceRequests = null; + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -311,6 +315,25 @@ public class RMContainerImpl implements readLock.unlock(); } } + + @Override + public List<ResourceRequest> getResourceRequests() { + try { + readLock.lock(); + return resourceRequests; + } finally { + readLock.unlock(); + } + } + + public void setResourceRequests(List<ResourceRequest> requests) { + try { + writeLock.lock(); + this.resourceRequests = requests; + } finally { + writeLock.unlock(); + } + } @Override public String toString() { @@ -432,6 +455,9 @@ public class RMContainerImpl implements @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + // Clear ResourceRequest stored in RMContainer + container.setResourceRequests(null); + // Register with containerAllocationExpirer. container.containerAllocationExpirer.register(container.getContainerId());
