YARN-5400. Light cleanup in ZKRMStateStore (templedf via rkanter)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcb2528a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcb2528a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcb2528a

Branch: refs/heads/HDFS-10467
Commit: bcb2528a51c33e4caff8d744c5e14c1accfc47d0
Parents: c3b235e
Author: Robert Kanter <rkan...@apache.org>
Authored: Wed Sep 28 14:56:41 2016 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Wed Sep 28 14:56:41 2016 -0700

----------------------------------------------------------------------
 .../yarn/server/resourcemanager/RMZKUtils.java  |  19 +-
 .../server/resourcemanager/ResourceManager.java |   2 +-
 .../recovery/ZKRMStateStore.java                | 260 +++++++++++--------
 3 files changed, 161 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcb2528a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
index d78068f..4b8561d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -37,9 +38,12 @@ public class RMZKUtils {
   private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
 
   /**
-   * Utility method to fetch the ZK ACLs from the configuration
+   * Utility method to fetch the ZK ACLs from the configuration.
+   *
+   * @throws java.io.IOException if the Zookeeper ACLs configuration file
+   * cannot be read
    */
-  public static List<ACL> getZKAcls(Configuration conf) throws Exception {
+  public static List<ACL> getZKAcls(Configuration conf) throws IOException {
     // Parse authentication from configuration.
     String zkAclConf =
         conf.get(YarnConfiguration.RM_ZK_ACL,
@@ -47,17 +51,20 @@ public class RMZKUtils {
     try {
       zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
       return ZKUtil.parseACLs(zkAclConf);
-    } catch (Exception e) {
+    } catch (IOException | ZKUtil.BadAclFormatException e) {
       LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
       throw e;
     }
   }
 
   /**
-   * Utility method to fetch ZK auth info from the configuration
+   * Utility method to fetch ZK auth info from the configuration.
+   *
+   * @throws java.io.IOException if the Zookeeper ACLs configuration file
+   * cannot be read
    */
   public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
-      throws Exception {
+      throws IOException {
     String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
     try {
       zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
@@ -66,7 +73,7 @@ public class RMZKUtils {
       } else {
         return Collections.emptyList();
       }
-    } catch (Exception e) {
+    } catch (IOException | ZKUtil.BadAuthFormatException e) {
       LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
       throw e;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcb2528a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index bf72fc1..8a6997d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -320,7 +320,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   }
 
   public CuratorFramework createAndStartCurator(Configuration conf)
-      throws Exception {
+      throws IOException {
     String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
     if (zkHostPort == null) {
       throw new YarnRuntimeException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcb2528a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index c24b3e9..51bb74d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -56,7 +56,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AM
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
@@ -68,8 +67,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.security.NoSuchAlgorithmException;
-import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -121,21 +120,18 @@ import java.util.List;
 @Private
 @Unstable
 public class ZKRMStateStore extends RMStateStore {
-
-  public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
-  private final SecureRandom random = new SecureRandom();
-
-  protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
-  protected static final Version CURRENT_VERSION_INFO = Version
-      .newInstance(1, 3);
+  private static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
   private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
       "RMDelegationTokensRoot";
   private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
       "RMDTSequentialNumber";
   private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
       "RMDTMasterKeysRoot";
+  protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+  protected static final Version CURRENT_VERSION_INFO =
+      Version.newInstance(1, 3);
 
-  /** Znode paths */
+  /* Znode paths */
   private String zkRootNodePath;
   private String rmAppRoot;
   private String rmDTSecretManagerRoot;
@@ -144,44 +140,54 @@ public class ZKRMStateStore extends RMStateStore {
   private String dtSequenceNumberPath;
   private String amrmTokenSecretManagerRoot;
   private String reservationRoot;
+
   @VisibleForTesting
   protected String znodeWorkingPath;
 
-  /** Fencing related variables */
+  /* Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
   private String fencingNodePath;
   private Thread verifyActiveStatusThread;
   private int zkSessionTimeout;
 
-  /** ACL and auth info */
+  /* ACL and auth info */
   private List<ACL> zkAcl;
   @VisibleForTesting
   List<ACL> zkRootNodeAcl;
   private String zkRootNodeUsername;
-  public static final int CREATE_DELETE_PERMS =
+
+  private static final int CREATE_DELETE_PERMS =
       ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE;
   private final String zkRootNodeAuthScheme =
       new DigestAuthenticationProvider().getScheme();
 
   @VisibleForTesting
   protected CuratorFramework curatorFramework;
+
   /**
-   * Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
+   * Given the {@link Configuration} and {@link ACL}s used (sourceACLs) for
    * ZooKeeper access, construct the {@link ACL}s for the store's root node.
-   * In the constructed {@link ACL}, all the users allowed by zkAcl are given
-   * rwa access, while the current RM has exclude create-delete access.
+   * In the constructed {@link ACL}, all the users allowed by sourceACLs are
+   * given read-write-admin access, while the current RM has exclusive
+   * create-delete access.
    *
-   * To be called only when HA is enabled and the configuration doesn't set ACL
-   * for the root node.
+   * To be called only when HA is enabled and the configuration doesn't set an
+   * ACL for the root node.
+   * @param conf the configuration
+   * @param sourceACLs the source ACLs
+   * @return ACLs for the store's root node
+   * @throws java.security.NoSuchAlgorithmException thrown if the digest
+   * algorithm used by Zookeeper cannot be found
    */
   @VisibleForTesting
   @Private
   @Unstable
-  protected List<ACL> constructZkRootNodeACL(
-      Configuration conf, List<ACL> sourceACLs) throws 
NoSuchAlgorithmException {
-    List<ACL> zkRootNodeAcl = new ArrayList<>();
+  protected List<ACL> constructZkRootNodeACL(Configuration conf,
+      List<ACL> sourceACLs) throws NoSuchAlgorithmException {
+    List<ACL> zkRootNodeAclList = new ArrayList<>();
+
     for (ACL acl : sourceACLs) {
-      zkRootNodeAcl.add(new ACL(
+      zkRootNodeAclList.add(new ACL(
           ZKUtil.removeSpecificPerms(acl.getPerms(), CREATE_DELETE_PERMS),
           acl.getId()));
     }
@@ -190,15 +196,16 @@ public class ZKRMStateStore extends RMStateStore {
         YarnConfiguration.RM_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
     Id rmId = new Id(zkRootNodeAuthScheme,
-        DigestAuthenticationProvider.generateDigest(
-            zkRootNodeUsername + ":" + 
resourceManager.getZkRootNodePassword()));
-    zkRootNodeAcl.add(new ACL(CREATE_DELETE_PERMS, rmId));
-    return zkRootNodeAcl;
+        DigestAuthenticationProvider.generateDigest(zkRootNodeUsername + ":"
+            + resourceManager.getZkRootNodePassword()));
+    zkRootNodeAclList.add(new ACL(CREATE_DELETE_PERMS, rmId));
+
+    return zkRootNodeAclList;
   }
 
   @Override
-  public synchronized void initInternal(Configuration conf) throws Exception {
-
+  public synchronized void initInternal(Configuration conf)
+      throws IOException, NoSuchAlgorithmException {
     /* Initialize fencing related paths, acls, and ops */
     znodeWorkingPath =
         conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
@@ -210,16 +217,19 @@ public class ZKRMStateStore extends RMStateStore {
         YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
     zkAcl = RMZKUtils.getZKAcls(conf);
+
     if (HAUtil.isHAEnabled(conf)) {
       String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
           (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
+
       if (zkRootNodeAclConf != null) {
         zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
+
         try {
           zkRootNodeAcl = ZKUtil.parseACLs(zkRootNodeAclConf);
         } catch (ZKUtil.BadAclFormatException bafe) {
-          LOG.error("Invalid format for " +
-              YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
+          LOG.error("Invalid format for "
+              + YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL);
           throw bafe;
         }
       } else {
@@ -239,6 +249,7 @@ public class ZKRMStateStore extends RMStateStore {
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
     curatorFramework = resourceManager.getCurator();
+
     if (curatorFramework == null) {
       curatorFramework = resourceManager.createAndStartCurator(conf);
     }
@@ -246,7 +257,6 @@ public class ZKRMStateStore extends RMStateStore {
 
   @Override
   public synchronized void startInternal() throws Exception {
-
     // ensure root dirs exist
     createRootDirRecursively(znodeWorkingPath);
     create(zkRootNodePath);
@@ -272,9 +282,11 @@ public class ZKRMStateStore extends RMStateStore {
 
     StringBuilder builder = new StringBuilder();
     builder.append(prefix);
+
     for (ACL acl : getAcls) {
       builder.append(acl.toString());
     }
+
     builder.append(getStat.toString());
     LOG.debug(builder.toString());
   }
@@ -301,6 +313,7 @@ public class ZKRMStateStore extends RMStateStore {
       verifyActiveStatusThread.interrupt();
       verifyActiveStatusThread.join(1000);
     }
+
     if (!HAUtil.isHAEnabled(getConfig())) {
       IOUtils.closeStream(curatorFramework);
     }
@@ -316,6 +329,7 @@ public class ZKRMStateStore extends RMStateStore {
     String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
     byte[] data =
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
+
     if (exists(versionNodePath)) {
       safeSetData(versionNodePath, data, -1);
     } else {
@@ -331,6 +345,7 @@ public class ZKRMStateStore extends RMStateStore {
       byte[] data = getData(versionNodePath);
       return new VersionPBImpl(VersionProto.parseFrom(data));
     }
+
     return null;
   }
 
@@ -338,6 +353,7 @@ public class ZKRMStateStore extends RMStateStore {
   public synchronized long getAndIncrementEpoch() throws Exception {
     String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
     long currentEpoch = 0;
+
     if (exists(epochNodePath)) {
       // load current epoch
       byte[] data = getData(epochNodePath);
@@ -353,6 +369,7 @@ public class ZKRMStateStore extends RMStateStore {
           .toByteArray();
       safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
     }
+
     return currentEpoch;
   }
 
@@ -367,31 +384,37 @@ public class ZKRMStateStore extends RMStateStore {
     loadAMRMTokenSecretManagerState(rmState);
     // recover reservation state
     loadReservationSystemState(rmState);
+
     return rmState;
   }
 
   private void loadReservationSystemState(RMState rmState) throws Exception {
     List<String> planNodes = getChildren(reservationRoot);
+
     for (String planName : planNodes) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Loading plan from znode: " + planName);
       }
-      String planNodePath = getNodePath(reservationRoot, planName);
 
+      String planNodePath = getNodePath(reservationRoot, planName);
       List<String> reservationNodes = getChildren(planNodePath);
+
       for (String reservationNodeName : reservationNodes) {
-        String reservationNodePath = getNodePath(planNodePath,
-            reservationNodeName);
+        String reservationNodePath =
+            getNodePath(planNodePath, reservationNodeName);
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("Loading reservation from znode: " + reservationNodePath);
         }
+
         byte[] reservationData = getData(reservationNodePath);
         ReservationAllocationStateProto allocationState =
             ReservationAllocationStateProto.parseFrom(reservationData);
+
         if (!rmState.getReservationState().containsKey(planName)) {
-          rmState.getReservationState().put(planName,
-              new HashMap<ReservationId, ReservationAllocationStateProto>());
+          rmState.getReservationState().put(planName, new HashMap<>());
         }
+
         ReservationId reservationId =
             ReservationId.parseReservationId(reservationNodeName);
         rmState.getReservationState().get(planName).put(reservationId,
@@ -403,16 +426,17 @@ public class ZKRMStateStore extends RMStateStore {
   private void loadAMRMTokenSecretManagerState(RMState rmState)
       throws Exception {
     byte[] data = getData(amrmTokenSecretManagerRoot);
+
     if (data == null) {
       LOG.warn("There is no data saved");
-      return;
+    } else {
+      AMRMTokenSecretManagerStatePBImpl stateData =
+          new AMRMTokenSecretManagerStatePBImpl(
+            AMRMTokenSecretManagerStateProto.parseFrom(data));
+      rmState.amrmTokenSecretManagerState =
+          AMRMTokenSecretManagerState.newInstance(
+            stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
     }
-    AMRMTokenSecretManagerStatePBImpl stateData =
-        new AMRMTokenSecretManagerStatePBImpl(
-          AMRMTokenSecretManagerStateProto.parseFrom(data));
-    rmState.amrmTokenSecretManagerState =
-        AMRMTokenSecretManagerState.newInstance(
-          stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
   }
 
   private synchronized void loadRMDTSecretManagerState(RMState rmState)
@@ -423,8 +447,8 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private void loadRMDelegationKeyState(RMState rmState) throws Exception {
-    List<String> childNodes =
-        getChildren(dtMasterKeysRootPath);
+    List<String> childNodes = getChildren(dtMasterKeysRootPath);
+
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
       byte[] childData = getData(childNodePath);
@@ -435,34 +459,30 @@ public class ZKRMStateStore extends RMStateStore {
       }
 
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
-      DataInputStream fsIn = new DataInputStream(is);
 
-      try {
+      try (DataInputStream fsIn = new DataInputStream(is)) {
         if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
           DelegationKey key = new DelegationKey();
           key.readFields(fsIn);
           rmState.rmSecretManagerState.masterKeyState.add(key);
+
           if (LOG.isDebugEnabled()) {
             LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
                 + ", expirationDate=" + key.getExpiryDate());
           }
         }
-      } finally {
-        is.close();
       }
     }
   }
 
   private void loadRMSequentialNumberState(RMState rmState) throws Exception {
     byte[] seqData = getData(dtSequenceNumberPath);
+
     if (seqData != null) {
       ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
-      DataInputStream seqIn = new DataInputStream(seqIs);
 
-      try {
+      try (DataInputStream seqIn = new DataInputStream(seqIs)) {
         rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
-      } finally {
-        seqIn.close();
       }
     }
   }
@@ -470,6 +490,7 @@ public class ZKRMStateStore extends RMStateStore {
   private void loadRMDelegationTokenState(RMState rmState) throws Exception {
     List<String> childNodes =
         getChildren(delegationTokensRootPath);
+
     for (String childNodeName : childNodes) {
       String childNodePath =
           getNodePath(delegationTokensRootPath, childNodeName);
@@ -481,9 +502,8 @@ public class ZKRMStateStore extends RMStateStore {
       }
 
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
-      DataInputStream fsIn = new DataInputStream(is);
 
-      try {
+      try (DataInputStream fsIn = new DataInputStream(is)) {
         if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
           RMDelegationTokenIdentifierData identifierData =
               new RMDelegationTokenIdentifierData();
@@ -493,36 +513,40 @@ public class ZKRMStateStore extends RMStateStore {
           long renewDate = identifierData.getRenewDate();
           rmState.rmSecretManagerState.delegationTokenState.put(identifier,
               renewDate);
+
           if (LOG.isDebugEnabled()) {
             LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
                 + " renewDate=" + renewDate);
           }
         }
-      } finally {
-        is.close();
       }
     }
   }
 
   private synchronized void loadRMAppState(RMState rmState) throws Exception {
     List<String> childNodes = getChildren(rmAppRoot);
+
     for (String childNodeName : childNodes) {
       String childNodePath = getNodePath(rmAppRoot, childNodeName);
       byte[] childData = getData(childNodePath);
+
       if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
         // application
         if (LOG.isDebugEnabled()) {
           LOG.debug("Loading application from znode: " + childNodeName);
         }
+
         ApplicationId appId = ApplicationId.fromString(childNodeName);
         ApplicationStateDataPBImpl appState =
             new ApplicationStateDataPBImpl(
                 ApplicationStateDataProto.parseFrom(childData));
+
         if (!appId.equals(
             appState.getApplicationSubmissionContext().getApplicationId())) {
-          throw new YarnRuntimeException("The child node name is different " +
-              "from the application id");
+          throw new YarnRuntimeException("The child node name is different "
+              + "from the application id");
         }
+
         rmState.appState.put(appId, appState);
         loadApplicationAttemptState(appState, appId);
       } else {
@@ -536,6 +560,7 @@ public class ZKRMStateStore extends RMStateStore {
       throws Exception {
     String appPath = getNodePath(rmAppRoot, appId.toString());
     List<String> attempts = getChildren(appPath);
+
     for (String attemptIDStr : attempts) {
       if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) 
{
         String attemptPath = getNodePath(appPath, attemptIDStr);
@@ -548,6 +573,7 @@ public class ZKRMStateStore extends RMStateStore {
         appState.attempts.put(attemptState.getAttemptId(), attemptState);
       }
     }
+
     LOG.debug("Done loading applications from ZK state store");
   }
 
@@ -559,21 +585,23 @@ public class ZKRMStateStore extends RMStateStore {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
     }
+
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
     safeCreate(nodeCreatePath, appStateData, zkAcl,
         CreateMode.PERSISTENT);
-
   }
 
   @Override
-  public synchronized void updateApplicationStateInternal(ApplicationId appId,
-      ApplicationStateData appStateDataPB) throws Exception {
+  protected synchronized void updateApplicationStateInternal(
+      ApplicationId appId, ApplicationStateData appStateDataPB)
+      throws Exception {
     String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for app: " + appId + " at: "
           + nodeUpdatePath);
     }
+
     byte[] appStateData = appStateDataPB.getProto().toByteArray();
 
     if (exists(nodeUpdatePath)) {
@@ -587,7 +615,7 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   @Override
-  public synchronized void storeApplicationAttemptStateInternal(
+  protected synchronized void storeApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
       ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
@@ -599,13 +627,13 @@ public class ZKRMStateStore extends RMStateStore {
       LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
           + nodeCreatePath);
     }
+
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
-    safeCreate(nodeCreatePath, attemptStateData, zkAcl,
-        CreateMode.PERSISTENT);
+    safeCreate(nodeCreatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT);
   }
 
   @Override
-  public synchronized void updateApplicationAttemptStateInternal(
+  protected synchronized void updateApplicationAttemptStateInternal(
       ApplicationAttemptId appAttemptId,
       ApplicationAttemptStateData attemptStateDataPB)
       throws Exception {
@@ -613,10 +641,12 @@ public class ZKRMStateStore extends RMStateStore {
     String appAttemptIdStr = appAttemptId.toString();
     String appDirPath = getNodePath(rmAppRoot, appIdStr);
     String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
           + " at: " + nodeUpdatePath);
     }
+
     byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
 
     if (exists(nodeUpdatePath)) {
@@ -630,25 +660,24 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   @Override
-  public synchronized void removeApplicationAttemptInternal(
-      ApplicationAttemptId appAttemptId)
-      throws Exception {
+  protected synchronized void removeApplicationAttemptInternal(
+      ApplicationAttemptId appAttemptId) throws Exception {
     String appId = appAttemptId.getApplicationId().toString();
     String appIdRemovePath = getNodePath(rmAppRoot, appId);
-    String attemptIdRemovePath = getNodePath(appIdRemovePath,
-        appAttemptId.toString());
+    String attemptIdRemovePath =
+        getNodePath(appIdRemovePath, appAttemptId.toString());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing info for attempt: " + appAttemptId + " at: "
           + attemptIdRemovePath);
     }
+
     safeDelete(attemptIdRemovePath);
   }
 
   @Override
-  public synchronized void removeApplicationStateInternal(
-      ApplicationStateData  appState)
-      throws Exception {
+  protected synchronized void removeApplicationStateInternal(
+      ApplicationStateData appState) throws Exception {
     String appId = 
appState.getApplicationSubmissionContext().getApplicationId()
         .toString();
     String appIdRemovePath = getNodePath(rmAppRoot, appId);
@@ -659,9 +688,11 @@ public class ZKRMStateStore extends RMStateStore {
     }
 
     for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
-      String attemptRemovePath = getNodePath(appIdRemovePath, 
attemptId.toString());
+      String attemptRemovePath =
+          getNodePath(appIdRemovePath, attemptId.toString());
       safeDelete(attemptRemovePath);
     }
+
     safeDelete(appIdRemovePath);
   }
 
@@ -680,10 +711,12 @@ public class ZKRMStateStore extends RMStateStore {
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationToken_"
           + rmDTIdentifier.getSequenceNumber());
     }
+
     safeDelete(nodeRemovePath);
   }
 
@@ -695,6 +728,7 @@ public class ZKRMStateStore extends RMStateStore {
     String nodeRemovePath =
         getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
             + rmDTIdentifier.getSequenceNumber());
+
     if (exists(nodeRemovePath)) {
       // in case znode exists
       addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, true);
@@ -703,6 +737,7 @@ public class ZKRMStateStore extends RMStateStore {
       addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
       LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
     }
+
     trx.commit();
   }
 
@@ -710,17 +745,16 @@ public class ZKRMStateStore extends RMStateStore {
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
       boolean isUpdate) throws Exception {
     // store RM delegation token
-    String nodeCreatePath =
-        getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
-            + rmDTIdentifier.getSequenceNumber());
-    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
-    DataOutputStream seqOut = new DataOutputStream(seqOs);
+    String nodeCreatePath = getNodePath(delegationTokensRootPath,
+        DELEGATION_TOKEN_PREFIX + rmDTIdentifier.getSequenceNumber());
     RMDelegationTokenIdentifierData identifierData =
         new RMDelegationTokenIdentifierData(rmDTIdentifier, renewDate);
-    try {
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+
+    try (DataOutputStream seqOut = new DataOutputStream(seqOs)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_" 
+
-            rmDTIdentifier.getSequenceNumber());
+        LOG.debug((isUpdate ? "Storing " : "Updating ") + "RMDelegationToken_"
+            + rmDTIdentifier.getSequenceNumber());
       }
 
       if (isUpdate) {
@@ -730,24 +764,23 @@ public class ZKRMStateStore extends RMStateStore {
             CreateMode.PERSISTENT);
         // Update Sequence number only while storing DT
         seqOut.writeInt(rmDTIdentifier.getSequenceNumber());
+
         if (LOG.isDebugEnabled()) {
-          LOG.debug((isUpdate ? "Storing " : "Updating ") +
-              dtSequenceNumberPath + ". SequenceNumber: "
+          LOG.debug((isUpdate ? "Storing " : "Updating ")
+              + dtSequenceNumberPath + ". SequenceNumber: "
               + rmDTIdentifier.getSequenceNumber());
         }
+
         trx.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1);
       }
-    } finally {
-      seqOs.close();
     }
   }
 
   @Override
   protected synchronized void storeRMDTMasterKeyState(
       DelegationKey delegationKey) throws Exception {
-    String nodeCreatePath =
-        getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
-            + delegationKey.getKeyId());
+    String nodeCreatePath = getNodePath(dtMasterKeysRootPath,
+        DELEGATION_KEY_PREFIX + delegationKey.getKeyId());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
     }
@@ -765,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore {
     String nodeRemovePath =
         getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
             + delegationKey.getKeyId());
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
     }
+
     safeDelete(nodeRemovePath);
   }
 
@@ -789,30 +824,31 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   @Override
-  public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
+  protected synchronized void storeOrUpdateAMRMTokenSecretManagerState(
       AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean 
isUpdate)
       throws Exception {
     AMRMTokenSecretManagerState data =
         AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
     byte[] stateData = data.getProto().toByteArray();
+
     safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
   }
 
   @Override
   protected synchronized void removeReservationState(String planName,
-      String reservationIdName)
-      throws Exception {
-    String planNodePath =
-        getNodePath(reservationRoot, planName);
-    String reservationPath = getNodePath(planNodePath,
-        reservationIdName);
+      String reservationIdName) throws Exception {
+    String planNodePath = getNodePath(reservationRoot, planName);
+    String reservationPath = getNodePath(planNodePath, reservationIdName);
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing reservationallocation " + reservationIdName + " for" 
+
-          " plan " + planName);
+      LOG.debug("Removing reservationallocation " + reservationIdName
+          + " for" + " plan " + planName);
     }
+
     safeDelete(reservationPath);
 
     List<String> reservationNodes = getChildren(planNodePath);
+
     if (reservationNodes.isEmpty()) {
       safeDelete(planNodePath);
     }
@@ -821,11 +857,10 @@ public class ZKRMStateStore extends RMStateStore {
   @Override
   protected synchronized void storeReservationState(
       ReservationAllocationStateProto reservationAllocation, String planName,
-      String reservationIdName)
-      throws Exception {
+      String reservationIdName) throws Exception {
     SafeTransaction trx = new SafeTransaction();
-    addOrUpdateReservationState(
-        reservationAllocation, planName, reservationIdName, trx, false);
+    addOrUpdateReservationState(reservationAllocation, planName,
+        reservationIdName, trx, false);
     trx.commit();
   }
 
@@ -843,6 +878,7 @@ public class ZKRMStateStore extends RMStateStore {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Creating plan node: " + planName + " at: " + 
planCreatePath);
       }
+
       trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT);
     }
 
@@ -871,6 +907,7 @@ public class ZKRMStateStore extends RMStateStore {
     Preconditions.checkArgument(pathParts.length >= 1 && 
pathParts[0].isEmpty(),
         "Invalid path: %s", path);
     StringBuilder sb = new StringBuilder();
+
     for (int i = 1; i < pathParts.length; i++) {
       sb.append("/").append(pathParts[i]);
       create(sb.toString());
@@ -947,10 +984,9 @@ public class ZKRMStateStore extends RMStateStore {
 
     SafeTransaction() throws Exception {
       CuratorTransaction transaction = curatorFramework.inTransaction();
-      transactionFinal =
-          transaction.create()
-              .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
-              .forPath(fencingNodePath, new byte[0]).and();
+      transactionFinal = transaction.create()
+          .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
+          .forPath(fencingNodePath, new byte[0]).and();
     }
 
     public void commit() throws Exception {
@@ -985,19 +1021,17 @@ public class ZKRMStateStore extends RMStateStore {
       super(VerifyActiveStatusThread.class.getName());
     }
 
+    @Override
     public void run() {
       try {
-        while (true) {
-          if(isFencedState()) {
-            break;
-          }
+        while (!isFencedState()) {
           // Create and delete fencing node
           new SafeTransaction().commit();
           Thread.sleep(zkSessionTimeout);
         }
       } catch (InterruptedException ie) {
-        LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
-            "interrupted! Exiting!");
+        LOG.info(getName() + " thread interrupted! Exiting!");
+        interrupt();
       } catch (Exception e) {
         notifyStoreOperationFailed(new StoreFencedException());
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to