Author: vinodkv
Date: Sat Oct 19 18:14:14 2013
New Revision: 1533803

URL: http://svn.apache.org/r1533803
Log:
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that prevent 
subsequent ResourceManager recovery. Contributed by Omkar Vinit Joshi.

Added:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
Removed:
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
    
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Oct 19 18:14:14 2013
@@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
     YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
     file busy errors (Sandy Ryza)
 
+    YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
+    prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via 
vinodkv)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
 Sat Oct 19 18:14:14 2013
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -118,6 +119,9 @@ public class FileSystemRMStateStore exte
         for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
           assert childNodeStatus.isFile();
           String childNodeName = childNodeStatus.getPath().getName();
+          if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+            continue;
+          }
           byte[] childData =
               readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@@ -178,12 +182,28 @@ public class FileSystemRMStateStore exte
     }
   }
 
+  private boolean checkAndRemovePartialRecord(Path record) throws IOException {
+    // If the file ends with .tmp then it shows that it failed
+    // during saving state into state store. The file will be deleted as a
+    // part of this call
+    if (record.getName().endsWith(".tmp")) {
+      LOG.error("incomplete rm state store entry found :"
+          + record);
+      fs.delete(record, false);
+      return true;
+    }
+    return false;
+  }
+
   private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
     FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
 
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
+      if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+        continue;
+      }
       if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
         rmState.rmSecretManagerState.dtSequenceNumber =
             Integer.parseInt(childNodeName.split("_")[1]);
@@ -344,10 +364,19 @@ public class FileSystemRMStateStore exte
     return data;
   }
 
+  /*
+   * In order to make this write atomic as a part of write we will first write
+   * data to .tmp file and then rename it. Here we are assuming that rename is
+   * atomic for underlying file system.
+   */
   private void writeFile(Path outputPath, byte[] data) throws Exception {
-    FSDataOutputStream fsOut = fs.create(outputPath, false);
+    Path tempPath =
+        new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+    FSDataOutputStream fsOut = null;
+    fsOut = fs.create(tempPath, false);
     fsOut.write(data);
     fsOut.close();
+    fs.rename(tempPath, outputPath);
   }
 
   private boolean renameFile(Path src, Path dst) throws Exception {

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1533803&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
 Sat Oct 19 18:14:14 2013
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import javax.crypto.SecretKey;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import org.apache.zookeeper.ZooKeeper;
+
+import org.junit.Test;
+
+public class RMStateStoreTestBase extends ClientBaseWithFixes{
+
+  public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
+
+  static class TestDispatcher implements
+      Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
+
+    ApplicationAttemptId attemptId;
+    Exception storedException;
+
+    boolean notified = false;
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void register(Class<? extends Enum> eventType,
+                         EventHandler handler) {
+    }
+
+    @Override
+    public void handle(RMAppAttemptStoredEvent event) {
+      assertEquals(attemptId, event.getApplicationAttemptId());
+      assertEquals(storedException, event.getStoredException());
+      notified = true;
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public EventHandler getEventHandler() {
+      return this;
+    }
+
+  }
+
+  interface RMStateStoreHelper {
+    RMStateStore getRMStateStore() throws Exception;
+    boolean isFinalStateValid() throws Exception;
+  }
+
+  void waitNotify(TestDispatcher dispatcher) {
+    long startTime = System.currentTimeMillis();
+    while(!dispatcher.notified) {
+      synchronized (dispatcher) {
+        try {
+          dispatcher.wait(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if(System.currentTimeMillis() - startTime > 1000*60) {
+        fail("Timed out attempt store notification");
+      }
+    }
+    dispatcher.notified = false;
+  }
+
+  void storeApp(
+      RMStateStore store, ApplicationId appId, long time) throws Exception {
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appId);
+
+    RMApp mockApp = mock(RMApp.class);
+    when(mockApp.getApplicationId()).thenReturn(appId);
+    when(mockApp.getSubmitTime()).thenReturn(time);
+    when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
+    when(mockApp.getUser()).thenReturn("test");
+    store.storeApplication(mockApp);
+  }
+
+  ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
+      String containerIdStr, Token<AMRMTokenIdentifier> appToken,
+      SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
+      throws Exception {
+
+    Container container = new ContainerPBImpl();
+    container.setId(ConverterUtils.toContainerId(containerIdStr));
+    RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
+    when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
+    when(mockAttempt.getMasterContainer()).thenReturn(container);
+    when(mockAttempt.getAMRMToken()).thenReturn(appToken);
+    when(mockAttempt.getClientTokenMasterKey())
+        .thenReturn(clientTokenMasterKey);
+    dispatcher.attemptId = attemptId;
+    dispatcher.storedException = null;
+    store.storeApplicationAttempt(mockAttempt);
+    waitNotify(dispatcher);
+    return container.getId();
+  }
+
+  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
+    long submitTime = System.currentTimeMillis();
+    Configuration conf = new YarnConfiguration();
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    AMRMTokenSecretManager appTokenMgr =
+        new AMRMTokenSecretManager(conf);
+    ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+        new ClientToAMTokenSecretManagerInRM();
+
+    ApplicationAttemptId attemptId1 = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
+    ApplicationId appId1 = attemptId1.getApplicationId();
+    storeApp(store, appId1, submitTime);
+
+    // create application token and client token key for attempt1
+    Token<AMRMTokenIdentifier> appAttemptToken1 =
+        generateAMRMToken(attemptId1, appTokenMgr);
+    HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
+    attemptTokenSet1.add(appAttemptToken1);
+    SecretKey clientTokenKey1 =
+        clientToAMTokenMgr.createMasterKey(attemptId1);
+
+    ContainerId containerId1 = storeAttempt(store, attemptId1,
+          "container_1352994193343_0001_01_000001",
+          appAttemptToken1, clientTokenKey1, dispatcher);
+
+    String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
+    ApplicationAttemptId attemptId2 =
+        ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+
+    // create application token and client token key for attempt2
+    Token<AMRMTokenIdentifier> appAttemptToken2 =
+        generateAMRMToken(attemptId2, appTokenMgr);
+    HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
+    attemptTokenSet2.add(appAttemptToken2);
+    SecretKey clientTokenKey2 =
+        clientToAMTokenMgr.createMasterKey(attemptId2);
+
+    ContainerId containerId2 = storeAttempt(store, attemptId2,
+          "container_1352994193343_0001_02_000001",
+          appAttemptToken2, clientTokenKey2, dispatcher);
+
+    ApplicationAttemptId attemptIdRemoved = ConverterUtils
+        .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+    ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+    storeApp(store, appIdRemoved, submitTime);
+    storeAttempt(store, attemptIdRemoved,
+        "container_1352994193343_0002_01_000001", null, null, dispatcher);
+
+    RMApp mockRemovedApp = mock(RMApp.class);
+    HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
+                              new HashMap<ApplicationAttemptId, 
RMAppAttempt>();
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
+    context.setApplicationId(appIdRemoved);
+    when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
+    when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
+    when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
+    RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
+    when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
+    attempts.put(attemptIdRemoved, mockRemovedAttempt);
+    store.removeApplication(mockRemovedApp);
+
+    // let things settle down
+    Thread.sleep(1000);
+    store.close();
+
+    // load state
+    store = stateStoreHelper.getRMStateStore();
+    RMState state = store.loadState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        state.getApplicationState();
+
+    ApplicationState appState = rmAppState.get(appId1);
+    // app is loaded
+    assertNotNull(appState);
+    // app is loaded correctly
+    assertEquals(submitTime, appState.getSubmitTime());
+    // submission context is loaded correctly
+    assertEquals(appId1,
+                 
appState.getApplicationSubmissionContext().getApplicationId());
+    ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
+    // attempt1 is loaded correctly
+    assertNotNull(attemptState);
+    assertEquals(attemptId1, attemptState.getAttemptId());
+    // attempt1 container is loaded correctly
+    assertEquals(containerId1, attemptState.getMasterContainer().getId());
+    // attempt1 applicationToken is loaded correctly
+    HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
+    savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
+    assertEquals(attemptTokenSet1, savedTokens);
+    // attempt1 client token master key is loaded correctly
+    assertArrayEquals(clientTokenKey1.getEncoded(),
+        attemptState.getAppAttemptCredentials()
+        .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+    attemptState = appState.getAttempt(attemptId2);
+    // attempt2 is loaded correctly
+    assertNotNull(attemptState);
+    assertEquals(attemptId2, attemptState.getAttemptId());
+    // attempt2 container is loaded correctly
+    assertEquals(containerId2, attemptState.getMasterContainer().getId());
+    // attempt2 applicationToken is loaded correctly
+    savedTokens.clear();
+    savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
+    assertEquals(attemptTokenSet2, savedTokens);
+    // attempt2 client token master key is loaded correctly
+    assertArrayEquals(clientTokenKey2.getEncoded(),
+        attemptState.getAppAttemptCredentials()
+        .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+    // assert store is in expected state after everything is cleaned
+    assertTrue(stateStoreHelper.isFinalStateValid());
+
+    store.close();
+  }
+
+  public void testRMDTSecretManagerStateStore(
+      RMStateStoreHelper stateStoreHelper) throws Exception {
+    RMStateStore store = stateStoreHelper.getRMStateStore();
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    // store RM delegation token;
+    RMDelegationTokenIdentifier dtId1 =
+        new RMDelegationTokenIdentifier(new Text("owner1"),
+          new Text("renewer1"), new Text("realuser1"));
+    Long renewDate1 = new Long(System.currentTimeMillis());
+    int sequenceNumber = 1111;
+    store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
+      sequenceNumber);
+    Map<RMDelegationTokenIdentifier, Long> token1 =
+        new HashMap<RMDelegationTokenIdentifier, Long>();
+    token1.put(dtId1, renewDate1);
+
+    // store delegation key;
+    DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
+    HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+    keySet.add(key);
+    store.storeRMDTMasterKey(key);
+
+    RMDTSecretManagerState secretManagerState =
+        store.loadState().getRMDTSecretManagerState();
+    Assert.assertEquals(token1, secretManagerState.getTokenState());
+    Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
+    Assert.assertEquals(sequenceNumber,
+        secretManagerState.getDTSequenceNumber());
+  }
+
+  private Token<AMRMTokenIdentifier> generateAMRMToken(
+      ApplicationAttemptId attemptId,
+      AMRMTokenSecretManager appTokenMgr) {
+    AMRMTokenIdentifier appTokenId =
+        new AMRMTokenIdentifier(attemptId);
+    Token<AMRMTokenIdentifier> appToken =
+        new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+    appToken.setService(new Text("appToken service"));
+    return appToken;
+  }
+}

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1533803&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
 Sat Oct 19 18:14:14 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestFSRMStateStore extends RMStateStoreTestBase {
+
+  public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
+
+  class TestFSRMStateStoreTester implements RMStateStoreHelper {
+
+    Path workingDirPathURI;
+    FileSystemRMStateStore store;
+    MiniDFSCluster cluster;
+
+    class TestFileSystemRMStore extends FileSystemRMStateStore {
+
+      TestFileSystemRMStore(Configuration conf) throws Exception {
+        init(conf);
+        Assert.assertNull(fs);
+        assertTrue(workingDirPathURI.equals(fsWorkingPath));
+        start();
+        Assert.assertNotNull(fs);
+      }
+    }
+
+    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
+      Path workingDirPath = new Path("/Test");
+      this.cluster = cluster;
+      FileSystem fs = cluster.getFileSystem();
+      fs.mkdirs(workingDirPath);
+      Path clusterURI = new Path(cluster.getURI());
+      workingDirPathURI = new Path(clusterURI, workingDirPath);
+      fs.close();
+    }
+
+    @Override
+    public RMStateStore getRMStateStore() throws Exception {
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+          workingDirPathURI.toString());
+      this.store = new TestFileSystemRMStore(conf);
+      return store;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      FileSystem fs = cluster.getFileSystem();
+      FileStatus[] files = fs.listStatus(workingDirPathURI);
+      return files.length == 1;
+    }
+  }
+
+  @Test
+  public void testFSRMStateStore() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      TestFSRMStateStoreTester fsTester = new 
TestFSRMStateStoreTester(cluster);
+      // If the state store is FileSystemRMStateStore then add corrupted entry.
+      // It should discard the entry and remove it from file system.
+      FSDataOutputStream fsOut = null;
+      FileSystemRMStateStore fileSystemRMStateStore =
+          (FileSystemRMStateStore) fsTester.getRMStateStore();
+      String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
+      ApplicationAttemptId attemptId3 =
+          ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
+      Path rootDir =
+          new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
+      Path appRootDir = new Path(rootDir, "RMAppRoot");
+      Path appDir =
+          new Path(appRootDir, attemptId3.getApplicationId().toString());
+      Path tempAppAttemptFile =
+          new Path(appDir, attemptId3.toString() + ".tmp");
+      fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
+      fsOut.write("Some random data ".getBytes());
+      fsOut.close();
+
+      testRMAppStateStore(fsTester);
+      Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+          .getFileSystem(conf).exists(tempAppAttemptFile));
+      testRMDTSecretManagerStateStore(fsTester);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

Added: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1533803&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
 (added)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
 Sat Oct 19 18:14:14 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class TestZKRMStateStore extends RMStateStoreTestBase {
+
+  public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+
+  class TestZKRMStateStoreTester implements RMStateStoreHelper {
+
+    ZooKeeper client;
+    ZKRMStateStore store;
+
+    class TestZKRMStateStoreInternal extends ZKRMStateStore {
+
+      public TestZKRMStateStoreInternal(Configuration conf, String 
workingZnode)
+          throws Exception {
+        init(conf);
+        start();
+        assertTrue(znodeWorkingPath.equals(workingZnode));
+      }
+
+      @Override
+      public ZooKeeper getNewZooKeeper() throws IOException {
+        return client;
+      }
+    }
+
+    public RMStateStore getRMStateStore() throws Exception {
+      String workingZnode = "/Test";
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+      this.client = createClient();
+      this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
+      return this.store;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
+      return nodes.size() == 1;
+    }
+  }
+
+  @Test
+  public void testZKRMStateStoreRealZK() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    testRMAppStateStore(zkTester);
+    testRMDTSecretManagerStateStore(zkTester);
+  }
+}

Modified: 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
 (original)
+++ 
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
 Sat Oct 19 18:14:14 2013
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
 import org.apache.hadoop.util.ZKUtil;
 
 import org.apache.zookeeper.CreateMode;
@@ -43,17 +43,20 @@ import static org.junit.Assert.fail;
 
 public class TestZKRMStateStoreZKClientConnections extends
     ClientBaseWithFixes {
+
   private static final int ZK_OP_WAIT_TIME = 3000;
   private Log LOG =
       LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
 
   class TestZKClient {
+
     ZKRMStateStore store;
     boolean forExpire = false;
     TestForwardingWatcher watcher;
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
     protected class TestZKRMStateStore extends ZKRMStateStore {
+
       public TestZKRMStateStore(Configuration conf, String workingZnode)
           throws Exception {
         init(conf);
@@ -87,6 +90,7 @@ public class TestZKRMStateStoreZKClientC
 
     private class TestForwardingWatcher extends
         ClientBaseWithFixes.CountdownWatcher {
+
       public void process(WatchedEvent event) {
         super.process(event);
         try {
@@ -187,7 +191,7 @@ public class TestZKRMStateStoreZKClientC
     }
   }
 
-  @Test (timeout = 20000)
+  @Test(timeout = 20000)
   public void testSetZKAcl() {
     TestZKClient zkClientTester = new TestZKClient();
     YarnConfiguration conf = new YarnConfiguration();
@@ -196,10 +200,11 @@ public class TestZKRMStateStoreZKClientC
       zkClientTester.store.zkClient.delete(zkClientTester.store
           .znodeWorkingPath, -1);
       fail("Shouldn't be able to delete path");
-    } catch (Exception e) {/* expected behavior */}
+    } catch (Exception e) {/* expected behavior */
+    }
   }
 
-  @Test (timeout = 20000)
+  @Test(timeout = 20000)
   public void testInvalidZKAclConfiguration() {
     TestZKClient zkClientTester = new TestZKClient();
     YarnConfiguration conf = new YarnConfiguration();


Reply via email to