Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 Fri Jul 25 20:33:09 2014
@@ -169,6 +169,15 @@ public class NodeManager extends Composi
     }
   }
 
+  private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
+      NMContainerTokenSecretManager containerTokenSecretManager)
+          throws IOException {
+    if (nmStore.canRecover()) {
+      nmTokenSecretManager.recover();
+      containerTokenSecretManager.recover();
+    }
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
 
@@ -181,10 +190,12 @@ public class NodeManager extends Composi
     initAndStartRecoveryStore(conf);
 
     NMContainerTokenSecretManager containerTokenSecretManager =
-        new NMContainerTokenSecretManager(conf);
+        new NMContainerTokenSecretManager(conf, nmStore);
 
     NMTokenSecretManagerInNM nmTokenSecretManager =
-        new NMTokenSecretManagerInNM();
+        new NMTokenSecretManagerInNM(nmStore);
+
+    recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
     
     this.aclsManager = new ApplicationACLsManager(conf);
 

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
 Fri Jul 25 20:33:09 2014
@@ -35,11 +35,19 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.fusesource.leveldbjni.JniDBFactory;
@@ -50,14 +58,18 @@ import org.iq80.leveldb.Logger;
 import org.iq80.leveldb.Options;
 import org.iq80.leveldb.WriteBatch;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NMLeveldbStateStoreService extends NMStateStoreService {
 
   public static final Log LOG =
       LogFactory.getLog(NMLeveldbStateStoreService.class);
 
   private static final String DB_NAME = "yarn-nm-state";
-  private static final String DB_SCHEMA_VERSION_KEY = "schema-version";
-  private static final String DB_SCHEMA_VERSION = "1.0";
+  private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
+  
+  private static final NMDBSchemaVersion CURRENT_VERSION_INFO = 
NMDBSchemaVersion
+      .newInstance(1, 0);
 
   private static final String DELETION_TASK_KEY_PREFIX =
       "DeletionService/deltask_";
@@ -72,6 +84,20 @@ public class NMLeveldbStateStoreService 
   private static final String LOCALIZATION_FILECACHE_SUFFIX = "filecache/";
   private static final String LOCALIZATION_APPCACHE_SUFFIX = "appcache/";
 
+  private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
+  private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
+  private static final String NM_TOKENS_KEY_PREFIX = "NMTokens/";
+  private static final String NM_TOKENS_CURRENT_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
+  private static final String NM_TOKENS_PREV_MASTER_KEY =
+      NM_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+  private static final String CONTAINER_TOKENS_KEY_PREFIX =
+      "ContainerTokens/";
+  private static final String CONTAINER_TOKENS_CURRENT_MASTER_KEY =
+      CONTAINER_TOKENS_KEY_PREFIX + CURRENT_MASTER_KEY_SUFFIX;
+  private static final String CONTAINER_TOKENS_PREV_MASTER_KEY =
+      CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX;
+
   private DB db;
 
   public NMLeveldbStateStoreService() {
@@ -122,7 +148,7 @@ public class NMLeveldbStateStoreService 
             key.substring(0, userEndPos+1)));
       }
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     } finally {
       if (iter != null) {
         iter.close();
@@ -241,7 +267,7 @@ public class NMLeveldbStateStoreService 
     try {
       db.put(bytes(key), proto.toByteArray());
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     }
   }
 
@@ -264,7 +290,7 @@ public class NMLeveldbStateStoreService 
         batch.close();
       }
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     }
   }
 
@@ -287,7 +313,7 @@ public class NMLeveldbStateStoreService 
         batch.close();
       }
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     }
   }
 
@@ -336,7 +362,7 @@ public class NMLeveldbStateStoreService 
             DeletionServiceDeleteTaskProto.parseFrom(entry.getValue()));
       }
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     } finally {
       if (iter != null) {
         iter.close();
@@ -352,7 +378,7 @@ public class NMLeveldbStateStoreService 
     try {
       db.put(bytes(key), taskProto.toByteArray());
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
     }
   }
 
@@ -362,7 +388,178 @@ public class NMLeveldbStateStoreService 
     try {
       db.delete(bytes(key));
     } catch (DBException e) {
-      throw new IOException(e.getMessage(), e);
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
+  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+    RecoveredNMTokensState state = new RecoveredNMTokensState();
+    state.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(NM_TOKENS_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(NM_TOKENS_KEY_PREFIX)) {
+          break;
+        }
+        String key = fullKey.substring(NM_TOKENS_KEY_PREFIX.length());
+        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
+          state.currentMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
+          state.previousMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.startsWith(
+            ApplicationAttemptId.appAttemptIdStrPrefix)) {
+          ApplicationAttemptId attempt;
+          try {
+            attempt = ConverterUtils.toApplicationAttemptId(key);
+          } catch (IllegalArgumentException e) {
+            throw new IOException("Bad application master key state for "
+                + fullKey, e);
+          }
+          state.applicationMasterKeys.put(attempt,
+              parseMasterKey(entry.getValue()));
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_CURRENT_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(NM_TOKENS_PREV_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException {
+    storeMasterKey(NM_TOKENS_KEY_PREFIX + attempt, key);
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException {
+    String key = NM_TOKENS_KEY_PREFIX + attempt;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private MasterKey parseMasterKey(byte[] keyData) throws IOException {
+    return new MasterKeyPBImpl(MasterKeyProto.parseFrom(keyData));
+  }
+
+  private void storeMasterKey(String dbKey, MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl pb = (MasterKeyPBImpl) key;
+    try {
+      db.put(bytes(dbKey), pb.getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  @Override
+  public RecoveredContainerTokensState loadContainerTokensState()
+      throws IOException {
+    RecoveredContainerTokensState state = new RecoveredContainerTokensState();
+    state.activeTokens = new HashMap<ContainerId, Long>();
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(CONTAINER_TOKENS_KEY_PREFIX));
+      final int containerTokensKeyPrefixLength =
+          CONTAINER_TOKENS_KEY_PREFIX.length();
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> entry = iter.next();
+        String fullKey = asString(entry.getKey());
+        if (!fullKey.startsWith(CONTAINER_TOKENS_KEY_PREFIX)) {
+          break;
+        }
+        String key = fullKey.substring(containerTokensKeyPrefixLength);
+        if (key.equals(CURRENT_MASTER_KEY_SUFFIX)) {
+          state.currentMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.equals(PREV_MASTER_KEY_SUFFIX)) {
+          state.previousMasterKey = parseMasterKey(entry.getValue());
+        } else if (key.startsWith(ConverterUtils.CONTAINER_PREFIX)) {
+          loadContainerToken(state, fullKey, key, entry.getValue());
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return state;
+  }
+
+  private static void loadContainerToken(RecoveredContainerTokensState state,
+      String key, String containerIdStr, byte[] value) throws IOException {
+    ContainerId containerId;
+    Long expTime;
+    try {
+      containerId = ConverterUtils.toContainerId(containerIdStr);
+      expTime = Long.parseLong(asString(value));
+    } catch (IllegalArgumentException e) {
+      throw new IOException("Bad container token state for " + key, e);
+    }
+    state.activeTokens.put(containerId, expTime);
+  }
+
+  @Override
+  public void storeContainerTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(CONTAINER_TOKENS_CURRENT_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeContainerTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    storeMasterKey(CONTAINER_TOKENS_PREV_MASTER_KEY, key);
+  }
+
+  @Override
+  public void storeContainerToken(ContainerId containerId, Long expTime)
+      throws IOException {
+    String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
+    try {
+      db.put(bytes(key), bytes(expTime.toString()));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainerToken(ContainerId containerId)
+      throws IOException {
+    String key = CONTAINER_TOKENS_KEY_PREFIX + containerId;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
     }
   }
 
@@ -376,22 +573,16 @@ public class NMLeveldbStateStoreService 
     options.logger(new LeveldbLogger());
     LOG.info("Using state database at " + storeRoot + " for recovery");
     File dbfile = new File(storeRoot.toString());
-    byte[] schemaVersionData = null;
     try {
       db = JniDBFactory.factory.open(dbfile, options);
-      try {
-        schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
-      } catch (DBException e) {
-        throw new IOException(e.getMessage(), e);
-      }
     } catch (NativeDB.DBException e) {
       if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
         LOG.info("Creating state database at " + dbfile);
         options.createIfMissing(true);
         try {
           db = JniDBFactory.factory.open(dbfile, options);
-          schemaVersionData = bytes(DB_SCHEMA_VERSION);
-          db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData);
+          // store version
+          storeVersion();
         } catch (DBException dbErr) {
           throw new IOException(dbErr.getMessage(), dbErr);
         }
@@ -399,16 +590,7 @@ public class NMLeveldbStateStoreService 
         throw e;
       }
     }
-    if (schemaVersionData != null) {
-      String schemaVersion = asString(schemaVersionData);
-      // only support exact schema matches for now
-      if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
-        throw new IOException("Incompatible state database schema, found "
-            + schemaVersion + " expected " + DB_SCHEMA_VERSION);
-      }
-    } else {
-      throw new IOException("State database schema version not found");
-    }
+    checkVersion();
   }
 
   private Path createStorageDir(Configuration conf) throws IOException {
@@ -433,4 +615,68 @@ public class NMLeveldbStateStoreService 
       LOG.info(message);
     }
   }
+
+
+  NMDBSchemaVersion loadVersion() throws IOException {
+    byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return NMDBSchemaVersion.newInstance(1, 0);
+    }
+    NMDBSchemaVersion version =
+        new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeVersion() throws IOException {
+    dbStoreVersion(CURRENT_VERSION_INFO);
+  }
+  
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(NMDBSchemaVersion state) throws IOException {
+    dbStoreVersion(state);
+  }
+  
+  private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
+    String key = DB_SCHEMA_VERSION_KEY;
+    byte[] data = 
+        ((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  NMDBSchemaVersion getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+  
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of state-store is a major upgrade, and any
+   *    compatible change of state-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade NM state or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    NMDBSchemaVersion loadedVersion = loadVersion();
+    LOG.info("Loaded NM state version info " + loadedVersion);
+    if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing NM state version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new IOException(
+        "Incompatible version for NM state: expecting NM state version " 
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
+    }
+  }
+  
 }

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
 Fri Jul 25 20:33:09 2014
@@ -22,10 +22,13 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 // The state store to use when state isn't being stored
 public class NMNullStateStoreService extends NMStateStoreService {
@@ -78,6 +81,59 @@ public class NMNullStateStoreService ext
   }
 
   @Override
+  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+  }
+
+  @Override
+  public RecoveredContainerTokensState loadContainerTokensState()
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Recovery not supported by this state store");
+  }
+
+  @Override
+  public void storeContainerTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+  }
+
+  @Override
+  public void storeContainerToken(ContainerId containerId,
+      Long expirationTime) throws IOException {
+  }
+
+  @Override
+  public void removeContainerToken(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
   protected void initStorage(Configuration conf) throws IOException {
   }
 

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
 Fri Jul 25 20:33:09 2014
@@ -29,10 +29,13 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 
 @Private
 @Unstable
@@ -100,6 +103,42 @@ public abstract class NMStateStoreServic
     }
   }
 
+  public static class RecoveredNMTokensState {
+    MasterKey currentMasterKey;
+    MasterKey previousMasterKey;
+    Map<ApplicationAttemptId, MasterKey> applicationMasterKeys;
+
+    public MasterKey getCurrentMasterKey() {
+      return currentMasterKey;
+    }
+
+    public MasterKey getPreviousMasterKey() {
+      return previousMasterKey;
+    }
+
+    public Map<ApplicationAttemptId, MasterKey> getApplicationMasterKeys() {
+      return applicationMasterKeys;
+    }
+  }
+
+  public static class RecoveredContainerTokensState {
+    MasterKey currentMasterKey;
+    MasterKey previousMasterKey;
+    Map<ContainerId, Long> activeTokens;
+
+    public MasterKey getCurrentMasterKey() {
+      return currentMasterKey;
+    }
+
+    public MasterKey getPreviousMasterKey() {
+      return previousMasterKey;
+    }
+
+    public Map<ContainerId, Long> getActiveTokens() {
+      return activeTokens;
+    }
+  }
+
   /** Initialize the state storage */
   @Override
   public void serviceInit(Configuration conf) throws IOException {
@@ -173,6 +212,38 @@ public abstract class NMStateStoreServic
   public abstract void removeDeletionTask(int taskId) throws IOException;
 
 
+  public abstract RecoveredNMTokensState loadNMTokensState()
+      throws IOException;
+
+  public abstract void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt, MasterKey key) throws IOException;
+
+  public abstract void removeNMTokenApplicationMasterKey(
+      ApplicationAttemptId attempt) throws IOException;
+
+
+  public abstract RecoveredContainerTokensState loadContainerTokensState()
+      throws IOException;
+
+  public abstract void storeContainerTokenCurrentMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeContainerTokenPreviousMasterKey(MasterKey key)
+      throws IOException;
+
+  public abstract void storeContainerToken(ContainerId containerId,
+      Long expirationTime) throws IOException;
+
+  public abstract void removeContainerToken(ContainerId containerId)
+      throws IOException;
+
+
   protected abstract void initStorage(Configuration conf) throws IOException;
 
   protected abstract void startStorage() throws IOException;

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
 Fri Jul 25 20:33:09 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -49,14 +53,74 @@ public class NMContainerTokenSecretManag
   
   private MasterKeyData previousMasterKey;
   private final TreeMap<Long, List<ContainerId>> 
recentlyStartedContainerTracker;
-
+  private final NMStateStoreService stateStore;
   
   private String nodeHostAddr;
   
   public NMContainerTokenSecretManager(Configuration conf) {
+    this(conf, new NMNullStateStoreService());
+  }
+
+  public NMContainerTokenSecretManager(Configuration conf,
+      NMStateStoreService stateStore) {
     super(conf);
     recentlyStartedContainerTracker =
         new TreeMap<Long, List<ContainerId>>();
+    this.stateStore = stateStore;
+  }
+
+  public synchronized void recover()
+      throws IOException {
+    RecoveredContainerTokensState state =
+        stateStore.loadContainerTokensState();
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
+      ContainerId containerId = entry.getKey();
+      Long expTime = entry.getValue();
+      List<ContainerId> containerList =
+          recentlyStartedContainerTracker.get(expTime);
+      if (containerList == null) {
+        containerList = new ArrayList<ContainerId>();
+        recentlyStartedContainerTracker.put(expTime, containerList);
+      }
+      if (!containerList.contains(containerId)) {
+        containerList.add(containerId);
+      }
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
   }
 
   /**
@@ -68,21 +132,16 @@ public class NMContainerTokenSecretManag
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKeyRecord) {
-    LOG.info("Rolling master-key for container-tokens, got key with id "
-        + masterKeyRecord.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
-            .getBytes().array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
-          .getKeyId()) {
-        // Update keys only if the key has changed.
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
-              .getBytes().array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKeyRecord.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKeyRecord.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
+          createSecretKey(masterKeyRecord.getBytes().array())));
     }
   }
 
@@ -137,14 +196,19 @@ public class NMContainerTokenSecretManag
 
     removeAnyContainerTokenIfExpired();
     
+    ContainerId containerId = tokenId.getContainerID();
     Long expTime = tokenId.getExpiryTimeStamp();
     // We might have multiple containers with same expiration time.
     if (!recentlyStartedContainerTracker.containsKey(expTime)) {
       recentlyStartedContainerTracker
         .put(expTime, new ArrayList<ContainerId>());
     }
-    recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
-
+    recentlyStartedContainerTracker.get(expTime).add(containerId);
+    try {
+      stateStore.storeContainerToken(containerId, expTime);
+    } catch (IOException e) {
+      LOG.error("Unable to store token for container " + containerId, e);
+    }
   }
 
   protected synchronized void removeAnyContainerTokenIfExpired() {
@@ -155,6 +219,13 @@ public class NMContainerTokenSecretManag
     while (containersI.hasNext()) {
       Entry<Long, List<ContainerId>> containerEntry = containersI.next();
       if (containerEntry.getKey() < currTime) {
+        for (ContainerId container : containerEntry.getValue()) {
+          try {
+            stateStore.removeContainerToken(container);
+          } catch (IOException e) {
+            LOG.error("Unable to remove token for container " + container, e);
+          }
+        }
         containersI.remove();
       } else {
         break;

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
 Fri Jul 25 20:33:09 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -45,16 +49,79 @@ public class NMTokenSecretManagerInNM ex
   
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> 
appToAppAttemptMap;
+  private final NMStateStoreService stateStore;
   private NodeId nodeId;                                                      
   
-  
   public NMTokenSecretManagerInNM() {
+    this(new NMNullStateStoreService());
+  }
+
+  public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
     this.oldMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKeyData>();
     appToAppAttemptMap =         
         new HashMap<ApplicationId, List<ApplicationAttemptId>>();
+    this.stateStore = stateStore;
   }
   
+  public synchronized void recover()
+      throws IOException {
+    RecoveredNMTokensState state = stateStore.loadNMTokensState();
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
+         state.getApplicationMasterKeys().entrySet()) {
+      key = entry.getValue();
+      oldMasterKeys.put(entry.getKey(),
+          new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+    }
+
+    // reconstruct app to app attempts map
+    appToAppAttemptMap.clear();
+    for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) {
+      ApplicationId app = attempt.getApplicationId();
+      List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app);
+      if (attempts == null) {
+        attempts = new ArrayList<ApplicationAttemptId>();
+        appToAppAttemptMap.put(app, attempts);
+      }
+      attempts.add(attempt);
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
+  }
+
   /**
    * Used by NodeManagers to create a token-secret-manager with the key
    * obtained from the RM. This can happen during registration or when the RM
@@ -62,20 +129,16 @@ public class NMTokenSecretManagerInNM ex
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKey) {
-    LOG.info("Rolling master-key for nm-tokens, got key with id :"
-        + masterKey.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-            .array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
-        .getKeyId()) {
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-              .array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKey.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKey.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKey,
+          createSecretKey(masterKey.getBytes().array())));
     }
   }
 
@@ -128,7 +191,7 @@ public class NMTokenSecretManagerInNM ex
       LOG.debug("Removing application attempts NMToken keys for application "
           + appId);
       for (ApplicationAttemptId appAttemptId : appAttemptList) {
-        this.oldMasterKeys.remove(appAttemptId);
+        removeAppAttemptKey(appAttemptId);
       }
       appToAppAttemptMap.remove(appId);
     } else {
@@ -164,11 +227,11 @@ public class NMTokenSecretManagerInNM ex
           + identifier.getApplicationAttemptId().toString());
       if (identifier.getKeyId() == currentMasterKey.getMasterKey()
         .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, currentMasterKey);
+        updateAppAttemptKey(appAttemptId, currentMasterKey);
       } else if (previousMasterKey != null
           && identifier.getKeyId() == previousMasterKey.getMasterKey()
             .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, previousMasterKey);
+        updateAppAttemptKey(appAttemptId, previousMasterKey);
       } else {
         throw new InvalidToken(
           "Older NMToken should not be used while starting the container.");
@@ -193,4 +256,24 @@ public class NMTokenSecretManagerInNM ex
   public synchronized NodeId getNodeId() {
     return this.nodeId;
   }
+
+  private void updateAppAttemptKey(ApplicationAttemptId attempt,
+      MasterKeyData key) {
+    this.oldMasterKeys.put(attempt, key);
+    try {
+      stateStore.storeNMTokenApplicationMasterKey(attempt,
+          key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to store master key for application " + attempt, e);
+    }
+  }
+
+  private void removeAppAttemptKey(ApplicationAttemptId attempt) {
+    this.oldMasterKeys.remove(attempt);
+    try {
+      stateStore.removeNMTokenApplicationMasterKey(attempt);
+    } catch (IOException e) {
+      LOG.error("Unable to remove master key for application " + attempt, e);
+    }
+  }
 }

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
 Fri Jul 25 20:33:09 2014
@@ -72,7 +72,7 @@ public class NodePage extends NMView {
           ._("Total Pmem allocated for Container",
               StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB))
           ._("Pmem enforcement enabled",
-              info.isVmemCheckEnabled())
+              info.isPmemCheckEnabled())
            ._("Total VCores allocated for Containers",
               String.valueOf(info.getTotalVCoresAllocated())) 
           ._("NodeHealthyStatus",

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
 Fri Jul 25 20:33:09 2014
@@ -38,3 +38,9 @@ message LocalizedResourceProto {
   optional string localPath = 2;
   optional int64 size = 3;
 }
+
+message NMDBSchemaVersionProto {
+  optional int32 majorVersion = 1;
+  optional int32 minorVersion = 2;
+}
+

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDefaultContainerExecutor.java
 Fri Jul 25 20:33:09 2014
@@ -18,16 +18,37 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.InputStream;
 import java.io.IOException;
+import java.io.LineNumberReader;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 
-import org.junit.Assert;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
@@ -45,15 +66,13 @@ import org.apache.hadoop.util.Progressab
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.FakeFSDataInputStream;
 
-import static org.apache.hadoop.fs.CreateFlag.*;
-
-
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
-import static org.mockito.Mockito.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestDefaultContainerExecutor {
 
@@ -191,6 +210,92 @@ public class TestDefaultContainerExecuto
     }
   }
 
+  @Test
+  public void testContainerLaunchError()
+      throws IOException, InterruptedException {
+
+    Path localDir = new Path(BASE_TMP_PATH, "localDir");
+    List<String> localDirs = new ArrayList<String>();
+    localDirs.add(localDir.toString());
+    List<String> logDirs = new ArrayList<String>();
+    Path logDir = new Path(BASE_TMP_PATH, "logDir");
+    logDirs.add(logDir.toString());
+
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+       conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.toString());
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.toString());
+    
+    FileContext lfs = FileContext.getLocalFSFileContext(conf);
+    DefaultContainerExecutor mockExec = spy(new DefaultContainerExecutor(lfs));
+    mockExec.setConf(conf);
+    doAnswer(
+        new Answer() {
+          @Override
+          public Object answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+            String diagnostics = (String) invocationOnMock.getArguments()[0];
+            assertTrue("Invalid Diagnostics message: " + diagnostics,
+                diagnostics.contains("No such file or directory"));
+            return null;
+          }
+        }
+    ).when(mockExec).logOutput(any(String.class));
+
+    String appSubmitter = "nobody";
+    String appId = "APP_ID";
+    String containerId = "CONTAINER_ID";
+    Container container = mock(Container.class);
+    ContainerId cId = mock(ContainerId.class);
+    ContainerLaunchContext context = mock(ContainerLaunchContext.class);
+    HashMap<String, String> env = new HashMap<String, String>();
+
+    when(container.getContainerId()).thenReturn(cId);
+    when(container.getLaunchContext()).thenReturn(context);
+    try {
+      doAnswer(new Answer() {
+        @Override
+        public Object answer(InvocationOnMock invocationOnMock)
+            throws Throwable {
+          ContainerDiagnosticsUpdateEvent event =
+              (ContainerDiagnosticsUpdateEvent) invocationOnMock
+                  .getArguments()[0];
+          assertTrue("Invalid Diagnostics message: "
+                  + event.getDiagnosticsUpdate(),
+              event.getDiagnosticsUpdate().contains("No such file or 
directory")
+          );
+          return null;
+        }
+      }).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
+
+      when(cId.toString()).thenReturn(containerId);
+      when(cId.getApplicationAttemptId()).thenReturn(
+          ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 
0));
+
+      when(context.getEnvironment()).thenReturn(env);
+
+      mockExec.createUserLocalDirs(localDirs, appSubmitter);
+      mockExec.createUserCacheDirs(localDirs, appSubmitter);
+      mockExec.createAppDirs(localDirs, appSubmitter, appId);
+      mockExec.createAppLogDirs(appId, logDirs);
+
+      Path scriptPath = new Path("file:///bin/echo");
+      Path tokensPath = new Path("file:///dev/null");
+      Path workDir = localDir;
+      Path pidFile = new Path(workDir, "pid.txt");
+
+      mockExec.init();
+      mockExec.activateContainer(cId, pidFile);
+      int ret = mockExec
+          .launchContainer(container, scriptPath, tokensPath, appSubmitter,
+              appId, workDir, localDirs, localDirs);
+      Assert.assertNotSame(0, ret);
+    } finally {
+      mockExec.deleteAsUser(appSubmitter, localDir);
+      mockExec.deleteAsUser(appSubmitter, logDir);
+    }
+  }
+
 //  @Test
 //  public void testInit() throws IOException, InterruptedException {
 //    Configuration conf = new Configuration();

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutorWithMocks.java
 Fri Jul 25 20:33:09 2014
@@ -19,8 +19,12 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -34,8 +38,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,9 +48,13 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
+import org.junit.Assert;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestLinuxContainerExecutorWithMocks {
 
@@ -216,7 +222,19 @@ public class TestLinuxContainerExecutorW
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, "file:///bin/echo");
     conf.set(YarnConfiguration.NM_LOG_DIRS, "file:///dev/null");
 
-    mockExec = new LinuxContainerExecutor();
+    mockExec = spy(new LinuxContainerExecutor());
+    doAnswer(
+        new Answer() {
+          @Override
+          public Object answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+             String diagnostics = (String) invocationOnMock.getArguments()[0];
+            assertTrue("Invalid Diagnostics message: " + diagnostics,
+                diagnostics.contains("badcommand"));
+            return null;
+          }
+        }
+    ).when(mockExec).logOutput(any(String.class));
     dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
     mockExec.setConf(conf);
@@ -233,7 +251,22 @@ public class TestLinuxContainerExecutorW
 
     when(container.getContainerId()).thenReturn(cId);
     when(container.getLaunchContext()).thenReturn(context);
-
+    doAnswer(
+        new Answer() {
+          @Override
+          public Object answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+            ContainerDiagnosticsUpdateEvent event =
+                (ContainerDiagnosticsUpdateEvent) invocationOnMock
+                    .getArguments()[0];
+            assertTrue("Invalid Diagnostics message: " +
+                event.getDiagnosticsUpdate(),
+                event.getDiagnosticsUpdate().contains("badcommand"));
+            return null;
+          }
+        }
+    ).when(container).handle(any(ContainerDiagnosticsUpdateEvent.class));
+    
     when(cId.toString()).thenReturn(containerId);
 
     when(context.getEnvironment()).thenReturn(env);

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
 Fri Jul 25 20:33:09 2014
@@ -25,14 +25,20 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 public class NMMemoryStateStoreService extends NMStateStoreService {
   private Map<TrackerKey, TrackerState> trackerStates;
   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
+  private RecoveredNMTokensState nmTokenState;
+  private RecoveredContainerTokensState containerTokenState;
 
   public NMMemoryStateStoreService() {
     super(NMMemoryStateStoreService.class.getName());
@@ -113,6 +119,11 @@ public class NMMemoryStateStoreService e
 
   @Override
   protected void initStorage(Configuration conf) {
+    nmTokenState = new RecoveredNMTokensState();
+    nmTokenState.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>();
+    containerTokenState = new RecoveredContainerTokensState();
+    containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
     trackerStates = new HashMap<TrackerKey, TrackerState>();
     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
   }
@@ -148,6 +159,89 @@ public class NMMemoryStateStoreService e
   }
 
 
+  @Override
+  public RecoveredNMTokensState loadNMTokensState() throws IOException {
+    // return a copy so caller can't modify our state
+    RecoveredNMTokensState result = new RecoveredNMTokensState();
+    result.currentMasterKey = nmTokenState.currentMasterKey;
+    result.previousMasterKey = nmTokenState.previousMasterKey;
+    result.applicationMasterKeys =
+        new HashMap<ApplicationAttemptId, MasterKey>(
+            nmTokenState.applicationMasterKeys);
+    return result;
+  }
+
+  @Override
+  public void storeNMTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt,
+      MasterKey key) throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    nmTokenState.applicationMasterKeys.put(attempt,
+        new MasterKeyPBImpl(keypb.getProto()));
+  }
+
+  @Override
+  public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt)
+      throws IOException {
+    nmTokenState.applicationMasterKeys.remove(attempt);
+  }
+
+
+  @Override
+  public RecoveredContainerTokensState loadContainerTokensState()
+      throws IOException {
+    // return a copy so caller can't modify our state
+    RecoveredContainerTokensState result =
+        new RecoveredContainerTokensState();
+    result.currentMasterKey = containerTokenState.currentMasterKey;
+    result.previousMasterKey = containerTokenState.previousMasterKey;
+    result.activeTokens =
+        new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
+    return result;
+  }
+
+  @Override
+  public void storeContainerTokenCurrentMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    containerTokenState.currentMasterKey =
+        new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeContainerTokenPreviousMasterKey(MasterKey key)
+      throws IOException {
+    MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
+    containerTokenState.previousMasterKey =
+        new MasterKeyPBImpl(keypb.getProto());
+  }
+
+  @Override
+  public void storeContainerToken(ContainerId containerId,
+      Long expirationTime) throws IOException {
+    containerTokenState.activeTokens.put(containerId, expirationTime);
+  }
+
+  @Override
+  public void removeContainerToken(ContainerId containerId)
+      throws IOException {
+    containerTokenState.activeTokens.remove(containerId);
+  }
+
+
   private static class TrackerState {
     Map<Path, LocalResourceProto> inProgressMap =
         new HashMap<Path, LocalResourceProto>();

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
 Fri Jul 25 20:33:09 2014
@@ -20,15 +20,20 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -37,12 +42,20 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -97,6 +110,36 @@ public class TestNMLeveldbStateStoreServ
     assertTrue(stateStore.canRecover());
     verifyEmptyState();
   }
+  
+  @Test
+  public void testCheckVersion() throws IOException {
+    // default version
+    NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // compatible version
+    NMDBSchemaVersion compatibleVersion =
+        NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    stateStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
+    restartStateStore();
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, stateStore.loadVersion());
+
+    // incompatible version
+    NMDBSchemaVersion incompatibleVersion =
+      NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    stateStore.storeVersion(incompatibleVersion);
+    try {
+      restartStateStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch", 
+        e.getMessage().contains("Incompatible version for NM state:"));
+    }
+  }
 
   @Test
   public void testStartResourceLocalization() throws IOException {
@@ -460,4 +503,159 @@ public class TestNMLeveldbStateStoreServ
     state = stateStore.loadDeletionServiceState();
     assertTrue(state.getTasks().isEmpty());
   }
+
+  @Test
+  public void testNMTokenStorage() throws IOException {
+    // test empty when no state
+    RecoveredNMTokensState state = stateStore.loadNMTokensState();
+    assertNull(state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a master key and verify recovered
+    NMTokenSecretManagerForTest secretMgr = new NMTokenSecretManagerForTest();
+    MasterKey currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a previous key and verify recovered
+    MasterKey prevKey = secretMgr.generateKey();
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    restartStateStore();
+    state = stateStore.loadNMTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    assertTrue(state.getApplicationMasterKeys().isEmpty());
+
+    // store a few application keys and verify recovered
+    ApplicationAttemptId attempt1 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    MasterKey attemptKey1 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt1, attemptKey1);
+    ApplicationAttemptId attempt2 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(2, 3), 4);
+    MasterKey attemptKey2 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    restartStateStore();
+    state = stateStore.loadNMTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    Map<ApplicationAttemptId, MasterKey> loadedAppKeys =
+        state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertEquals(attemptKey1, loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+
+    // add/update/remove keys and verify recovered
+    ApplicationAttemptId attempt3 = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(5, 6), 7);
+    MasterKey attemptKey3 = secretMgr.generateKey();
+    stateStore.storeNMTokenApplicationMasterKey(attempt3, attemptKey3);
+    stateStore.removeNMTokenApplicationMasterKey(attempt1);
+    attemptKey2 = prevKey;
+    stateStore.storeNMTokenApplicationMasterKey(attempt2, attemptKey2);
+    prevKey = currentKey;
+    stateStore.storeNMTokenPreviousMasterKey(prevKey);
+    currentKey = secretMgr.generateKey();
+    stateStore.storeNMTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadNMTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    loadedAppKeys = state.getApplicationMasterKeys();
+    assertEquals(2, loadedAppKeys.size());
+    assertNull(loadedAppKeys.get(attempt1));
+    assertEquals(attemptKey2, loadedAppKeys.get(attempt2));
+    assertEquals(attemptKey3, loadedAppKeys.get(attempt3));
+  }
+
+  @Test
+  public void testContainerTokenStorage() throws IOException {
+    // test empty when no state
+    RecoveredContainerTokensState state =
+        stateStore.loadContainerTokensState();
+    assertNull(state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getActiveTokens().isEmpty());
+
+    // store a master key and verify recovered
+    ContainerTokenKeyGeneratorForTest keygen =
+        new ContainerTokenKeyGeneratorForTest(new YarnConfiguration());
+    MasterKey currentKey = keygen.generateKey();
+    stateStore.storeContainerTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadContainerTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertNull(state.getPreviousMasterKey());
+    assertTrue(state.getActiveTokens().isEmpty());
+
+    // store a previous key and verify recovered
+    MasterKey prevKey = keygen.generateKey();
+    stateStore.storeContainerTokenPreviousMasterKey(prevKey);
+    restartStateStore();
+    state = stateStore.loadContainerTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    assertTrue(state.getActiveTokens().isEmpty());
+
+    // store a few container tokens and verify recovered
+    ContainerId cid1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+    Long expTime1 = 1234567890L;
+    ContainerId cid2 = BuilderUtils.newContainerId(2, 2, 2, 2);
+    Long expTime2 = 9876543210L;
+    stateStore.storeContainerToken(cid1, expTime1);
+    stateStore.storeContainerToken(cid2, expTime2);
+    restartStateStore();
+    state = stateStore.loadContainerTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    Map<ContainerId, Long> loadedActiveTokens =
+        state.getActiveTokens();
+    assertEquals(2, loadedActiveTokens.size());
+    assertEquals(expTime1, loadedActiveTokens.get(cid1));
+    assertEquals(expTime2, loadedActiveTokens.get(cid2));
+
+    // add/update/remove tokens and verify recovered
+    ContainerId cid3 = BuilderUtils.newContainerId(3, 3, 3, 3);
+    Long expTime3 = 135798642L;
+    stateStore.storeContainerToken(cid3, expTime3);
+    stateStore.removeContainerToken(cid1);
+    expTime2 += 246897531L;
+    stateStore.storeContainerToken(cid2, expTime2);
+    prevKey = currentKey;
+    stateStore.storeContainerTokenPreviousMasterKey(prevKey);
+    currentKey = keygen.generateKey();
+    stateStore.storeContainerTokenCurrentMasterKey(currentKey);
+    restartStateStore();
+    state = stateStore.loadContainerTokensState();
+    assertEquals(currentKey, state.getCurrentMasterKey());
+    assertEquals(prevKey, state.getPreviousMasterKey());
+    loadedActiveTokens = state.getActiveTokens();
+    assertEquals(2, loadedActiveTokens.size());
+    assertNull(loadedActiveTokens.get(cid1));
+    assertEquals(expTime2, loadedActiveTokens.get(cid2));
+    assertEquals(expTime3, loadedActiveTokens.get(cid3));
+  }
+
+  private static class NMTokenSecretManagerForTest extends
+      BaseNMTokenSecretManager {
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
+
+  private static class ContainerTokenKeyGeneratorForTest extends
+      BaseContainerTokenSecretManager {
+    public ContainerTokenKeyGeneratorForTest(Configuration conf) {
+      super(conf);
+    }
+
+    public MasterKey generateKey() {
+      return createNewMasterKey().getMasterKey();
+    }
+  }
 }

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
 Fri Jul 25 20:33:09 2014
@@ -194,6 +194,21 @@
 
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-auth</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
       <groupId>com.sun.jersey.jersey-test-framework</groupId>
       <artifactId>jersey-test-framework-grizzly2</artifactId>
       <scope>test</scope>

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 Fri Jul 25 20:33:09 2014
@@ -1035,8 +1035,8 @@ public class ResourceManager extends Com
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();
-      // If -format, then delete RMStateStore; else startup normally
-      if (argv.length == 1 && argv[0].equals("-format")) {
+      // If -format-state-store, then delete RMStateStore; else startup 
normally
+      if (argv.length == 1 && argv[0].equals("-format-state-store")) {
         deleteRMStateStore(conf);
       } else {
         ResourceManager resourceManager = new ResourceManager();

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
 Fri Jul 25 20:33:09 2014
@@ -205,12 +205,6 @@ public class RMAppImpl implements RMApp,
     .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
         RMAppEventType.APP_RUNNING_ON_NODE,
         new AppRunningOnNodeTransition())
-    // ACCECPTED state can once again receive APP_ACCEPTED event, because on
-    // recovery the app returns ACCEPTED state and the app once again go
-    // through the scheduler and triggers one more APP_ACCEPTED event at
-    // ACCEPTED state.
-    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
-        RMAppEventType.APP_ACCEPTED)
 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -789,8 +783,18 @@ public class RMAppImpl implements RMApp,
         return app.recoveredFinalState;
       }
 
-      // Notify scheduler about the app on recovery
-      new AddApplicationToSchedulerTransition().transition(app, event);
+      // No existent attempts means the attempt associated with this app was 
not
+      // started or started but not yet saved.
+      if (app.attempts.isEmpty()) {
+        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+          app.submissionContext.getQueue(), app.user));
+        return RMAppState.SUBMITTED;
+      }
+
+      // Add application to scheduler synchronously to guarantee scheduler
+      // knows applications before AM or NM re-registers.
+      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
+        app.submissionContext.getQueue(), app.user, true));
 
       // recover attempts
       app.recoverAppAttempts();
@@ -805,12 +809,6 @@ public class RMAppImpl implements RMApp,
         return RMAppState.ACCEPTED;
       }
 
-      // No existent attempts means the attempt associated with this app was 
not
-      // started or started but not yet saved.
-      if (app.attempts.isEmpty()) {
-        return RMAppState.SUBMITTED;
-      }
-
       // YARN-1507 is saving the application state after the application is
       // accepted. So after YARN-1507, an app is saved meaning it is accepted.
       // Thus we return ACCECPTED state on recovery.

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 Fri Jul 25 20:33:09 2014
@@ -774,11 +774,9 @@ public class RMAppAttemptImpl implements
       }
 
       // create AMRMToken
-      AMRMTokenIdentifier id =
-          new AMRMTokenIdentifier(appAttempt.applicationAttemptId);
       appAttempt.amrmToken =
-          new Token<AMRMTokenIdentifier>(id,
-            appAttempt.rmContext.getAMRMTokenSecretManager());
+          
appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken(
+            appAttempt.applicationAttemptId);
 
       // Add the applicationAttempt to the scheduler and inform the scheduler
       // whether to transfer the state from previous attempt.
@@ -926,8 +924,10 @@ public class RMAppAttemptImpl implements
           appAttempt.masterService
               .registerAppAttempt(appAttempt.applicationAttemptId);
 
-          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
-            appAttempt.getAppAttemptId(), false, false));
+          // Add attempt to scheduler synchronously to guarantee scheduler
+          // knows attempts before AM or NM re-registers.
+          appAttempt.scheduler.handle(new AppAttemptAddedSchedulerEvent(
+            appAttempt.getAppAttemptId(), false, true));
         }
 
         /*
@@ -1294,12 +1294,20 @@ public class RMAppAttemptImpl implements
   private String getAMContainerCrashedDiagnostics(
       RMAppAttemptContainerFinishedEvent finishEvent) {
     ContainerStatus status = finishEvent.getContainerStatus();
-    String diagnostics =
-        "AM Container for " + finishEvent.getApplicationAttemptId()
-        + " exited with " + " exitCode: " + status.getExitStatus() + ". "
-        + "Check application tracking page: " + this.getTrackingUrl()
-        + " . Then, click on links to logs of each attempt for detailed 
output. ";
-    return diagnostics;
+    StringBuilder diagnosticsBuilder = new StringBuilder();
+    diagnosticsBuilder.append("AM Container for ").append(
+      finishEvent.getApplicationAttemptId()).append(
+      " exited with ").append(" exitCode: ").append(status.getExitStatus()).
+      append("\n");
+    if (this.getTrackingUrl() != null) {
+      diagnosticsBuilder.append("For more detailed output,").append(
+        " check application tracking page:").append(
+        this.getTrackingUrl()).append(
+        "Then, click on links to logs of each attempt.\n");
+    }
+    diagnosticsBuilder.append("Diagnostics: ").append(status.getDiagnostics())
+        .append("Failing this attempt");
+    return diagnosticsBuilder.toString();
   }
 
   private static class FinalTransition extends BaseFinalTransition {

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
 Fri Jul 25 20:33:09 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
+import java.util.List;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 
 /**
@@ -73,5 +76,7 @@ public interface RMContainer extends Eve
   ContainerReport createContainerReport();
   
   boolean isAMContainer();
+  
+  List<ResourceRequest> getResourceRequests();
 
 }

Modified: 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1613514&r1=1613513&r2=1613514&view=diff
==============================================================================
--- 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 (original)
+++ 
hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 Fri Jul 25 20:33:09 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 
 import java.util.EnumSet;
+import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -158,6 +160,7 @@ public class RMContainerImpl implements 
   private long finishTime;
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
+  private List<ResourceRequest> resourceRequests;
 
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -180,7 +183,8 @@ public class RMContainerImpl implements 
     this.eventHandler = rmContext.getDispatcher().getEventHandler();
     this.containerAllocationExpirer = 
rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
-    
+    this.resourceRequests = null;
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -311,6 +315,25 @@ public class RMContainerImpl implements 
       readLock.unlock();
     }
   }
+  
+  @Override
+  public List<ResourceRequest> getResourceRequests() {
+    try {
+      readLock.lock();
+      return resourceRequests;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public void setResourceRequests(List<ResourceRequest> requests) {
+    try {
+      writeLock.lock();
+      this.resourceRequests = requests;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   @Override
   public String toString() {
@@ -432,6 +455,9 @@ public class RMContainerImpl implements 
 
     @Override
     public void transition(RMContainerImpl container, RMContainerEvent event) {
+      // Clear ResourceRequest stored in RMContainer
+      container.setResourceRequests(null);
+      
       // Register with containerAllocationExpirer.
       
container.containerAllocationExpirer.register(container.getContainerId());
 


Reply via email to