Author: vinodkv
Date: Sat Oct 19 18:14:14 2013
New Revision: 1533803
URL: http://svn.apache.org/r1533803
Log:
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that prevent
subsequent ResourceManager recovery. Contributed by Omkar Vinit Joshi.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Oct 19 18:14:14 2013
@@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
file busy errors (Sandy Ryza)
+ YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
+ prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via
vinodkv)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
Sat Oct 19 18:14:14 2013
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -118,6 +119,9 @@ public class FileSystemRMStateStore exte
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
+ if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+ continue;
+ }
byte[] childData =
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@@ -178,12 +182,28 @@ public class FileSystemRMStateStore exte
}
}
+ private boolean checkAndRemovePartialRecord(Path record) throws IOException {
+ // If the file ends with .tmp then it shows that it failed
+ // during saving state into state store. The file will be deleted as a
+ // part of this call
+ if (record.getName().endsWith(".tmp")) {
+ LOG.error("incomplete rm state store entry found :"
+ + record);
+ fs.delete(record, false);
+ return true;
+ }
+ return false;
+ }
+
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
+ if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+ continue;
+ }
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
@@ -344,10 +364,19 @@ public class FileSystemRMStateStore exte
return data;
}
+ /*
+ * In order to make this write atomic as a part of write we will first write
+ * data to .tmp file and then rename it. Here we are assuming that rename is
+ * atomic for underlying file system.
+ */
private void writeFile(Path outputPath, byte[] data) throws Exception {
- FSDataOutputStream fsOut = fs.create(outputPath, false);
+ Path tempPath =
+ new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
+ FSDataOutputStream fsOut = null;
+ fsOut = fs.create(tempPath, false);
fsOut.write(data);
fsOut.close();
+ fs.rename(tempPath, outputPath);
}
private boolean renameFile(Path src, Path dst) throws Exception {
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1533803&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
Sat Oct 19 18:14:14 2013
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import javax.crypto.SecretKey;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import org.apache.zookeeper.ZooKeeper;
+
+import org.junit.Test;
+
+public class RMStateStoreTestBase extends ClientBaseWithFixes{
+
+ public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
+
+ static class TestDispatcher implements
+ Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
+
+ ApplicationAttemptId attemptId;
+ Exception storedException;
+
+ boolean notified = false;
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void register(Class<? extends Enum> eventType,
+ EventHandler handler) {
+ }
+
+ @Override
+ public void handle(RMAppAttemptStoredEvent event) {
+ assertEquals(attemptId, event.getApplicationAttemptId());
+ assertEquals(storedException, event.getStoredException());
+ notified = true;
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public EventHandler getEventHandler() {
+ return this;
+ }
+
+ }
+
+ interface RMStateStoreHelper {
+ RMStateStore getRMStateStore() throws Exception;
+ boolean isFinalStateValid() throws Exception;
+ }
+
+ void waitNotify(TestDispatcher dispatcher) {
+ long startTime = System.currentTimeMillis();
+ while(!dispatcher.notified) {
+ synchronized (dispatcher) {
+ try {
+ dispatcher.wait(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if(System.currentTimeMillis() - startTime > 1000*60) {
+ fail("Timed out attempt store notification");
+ }
+ }
+ dispatcher.notified = false;
+ }
+
+ void storeApp(
+ RMStateStore store, ApplicationId appId, long time) throws Exception {
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appId);
+
+ RMApp mockApp = mock(RMApp.class);
+ when(mockApp.getApplicationId()).thenReturn(appId);
+ when(mockApp.getSubmitTime()).thenReturn(time);
+ when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
+ when(mockApp.getUser()).thenReturn("test");
+ store.storeApplication(mockApp);
+ }
+
+ ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
+ String containerIdStr, Token<AMRMTokenIdentifier> appToken,
+ SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
+ throws Exception {
+
+ Container container = new ContainerPBImpl();
+ container.setId(ConverterUtils.toContainerId(containerIdStr));
+ RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
+ when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
+ when(mockAttempt.getMasterContainer()).thenReturn(container);
+ when(mockAttempt.getAMRMToken()).thenReturn(appToken);
+ when(mockAttempt.getClientTokenMasterKey())
+ .thenReturn(clientTokenMasterKey);
+ dispatcher.attemptId = attemptId;
+ dispatcher.storedException = null;
+ store.storeApplicationAttempt(mockAttempt);
+ waitNotify(dispatcher);
+ return container.getId();
+ }
+
+ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+ throws Exception {
+ long submitTime = System.currentTimeMillis();
+ Configuration conf = new YarnConfiguration();
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ AMRMTokenSecretManager appTokenMgr =
+ new AMRMTokenSecretManager(conf);
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+
+ ApplicationAttemptId attemptId1 = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
+ ApplicationId appId1 = attemptId1.getApplicationId();
+ storeApp(store, appId1, submitTime);
+
+ // create application token and client token key for attempt1
+ Token<AMRMTokenIdentifier> appAttemptToken1 =
+ generateAMRMToken(attemptId1, appTokenMgr);
+ HashSet<Token<?>> attemptTokenSet1 = new HashSet<Token<?>>();
+ attemptTokenSet1.add(appAttemptToken1);
+ SecretKey clientTokenKey1 =
+ clientToAMTokenMgr.createMasterKey(attemptId1);
+
+ ContainerId containerId1 = storeAttempt(store, attemptId1,
+ "container_1352994193343_0001_01_000001",
+ appAttemptToken1, clientTokenKey1, dispatcher);
+
+ String appAttemptIdStr2 = "appattempt_1352994193343_0001_000002";
+ ApplicationAttemptId attemptId2 =
+ ConverterUtils.toApplicationAttemptId(appAttemptIdStr2);
+
+ // create application token and client token key for attempt2
+ Token<AMRMTokenIdentifier> appAttemptToken2 =
+ generateAMRMToken(attemptId2, appTokenMgr);
+ HashSet<Token<?>> attemptTokenSet2 = new HashSet<Token<?>>();
+ attemptTokenSet2.add(appAttemptToken2);
+ SecretKey clientTokenKey2 =
+ clientToAMTokenMgr.createMasterKey(attemptId2);
+
+ ContainerId containerId2 = storeAttempt(store, attemptId2,
+ "container_1352994193343_0001_02_000001",
+ appAttemptToken2, clientTokenKey2, dispatcher);
+
+ ApplicationAttemptId attemptIdRemoved = ConverterUtils
+ .toApplicationAttemptId("appattempt_1352994193343_0002_000001");
+ ApplicationId appIdRemoved = attemptIdRemoved.getApplicationId();
+ storeApp(store, appIdRemoved, submitTime);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+
+ RMApp mockRemovedApp = mock(RMApp.class);
+ HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
+ new HashMap<ApplicationAttemptId,
RMAppAttempt>();
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
+ context.setApplicationId(appIdRemoved);
+ when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
+ when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
+ when(mockRemovedApp.getAppAttempts()).thenReturn(attempts);
+ RMAppAttempt mockRemovedAttempt = mock(RMAppAttempt.class);
+ when(mockRemovedAttempt.getAppAttemptId()).thenReturn(attemptIdRemoved);
+ attempts.put(attemptIdRemoved, mockRemovedAttempt);
+ store.removeApplication(mockRemovedApp);
+
+ // let things settle down
+ Thread.sleep(1000);
+ store.close();
+
+ // load state
+ store = stateStoreHelper.getRMStateStore();
+ RMState state = store.loadState();
+ Map<ApplicationId, ApplicationState> rmAppState =
+ state.getApplicationState();
+
+ ApplicationState appState = rmAppState.get(appId1);
+ // app is loaded
+ assertNotNull(appState);
+ // app is loaded correctly
+ assertEquals(submitTime, appState.getSubmitTime());
+ // submission context is loaded correctly
+ assertEquals(appId1,
+
appState.getApplicationSubmissionContext().getApplicationId());
+ ApplicationAttemptState attemptState = appState.getAttempt(attemptId1);
+ // attempt1 is loaded correctly
+ assertNotNull(attemptState);
+ assertEquals(attemptId1, attemptState.getAttemptId());
+ // attempt1 container is loaded correctly
+ assertEquals(containerId1, attemptState.getMasterContainer().getId());
+ // attempt1 applicationToken is loaded correctly
+ HashSet<Token<?>> savedTokens = new HashSet<Token<?>>();
+ savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
+ assertEquals(attemptTokenSet1, savedTokens);
+ // attempt1 client token master key is loaded correctly
+ assertArrayEquals(clientTokenKey1.getEncoded(),
+ attemptState.getAppAttemptCredentials()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+ attemptState = appState.getAttempt(attemptId2);
+ // attempt2 is loaded correctly
+ assertNotNull(attemptState);
+ assertEquals(attemptId2, attemptState.getAttemptId());
+ // attempt2 container is loaded correctly
+ assertEquals(containerId2, attemptState.getMasterContainer().getId());
+ // attempt2 applicationToken is loaded correctly
+ savedTokens.clear();
+ savedTokens.addAll(attemptState.getAppAttemptCredentials().getAllTokens());
+ assertEquals(attemptTokenSet2, savedTokens);
+ // attempt2 client token master key is loaded correctly
+ assertArrayEquals(clientTokenKey2.getEncoded(),
+ attemptState.getAppAttemptCredentials()
+ .getSecretKey(RMStateStore.AM_CLIENT_TOKEN_MASTER_KEY_NAME));
+
+ // assert store is in expected state after everything is cleaned
+ assertTrue(stateStoreHelper.isFinalStateValid());
+
+ store.close();
+ }
+
+ public void testRMDTSecretManagerStateStore(
+ RMStateStoreHelper stateStoreHelper) throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setRMDispatcher(dispatcher);
+
+ // store RM delegation token;
+ RMDelegationTokenIdentifier dtId1 =
+ new RMDelegationTokenIdentifier(new Text("owner1"),
+ new Text("renewer1"), new Text("realuser1"));
+ Long renewDate1 = new Long(System.currentTimeMillis());
+ int sequenceNumber = 1111;
+ store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
+ sequenceNumber);
+ Map<RMDelegationTokenIdentifier, Long> token1 =
+ new HashMap<RMDelegationTokenIdentifier, Long>();
+ token1.put(dtId1, renewDate1);
+
+ // store delegation key;
+ DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
+ HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+ keySet.add(key);
+ store.storeRMDTMasterKey(key);
+
+ RMDTSecretManagerState secretManagerState =
+ store.loadState().getRMDTSecretManagerState();
+ Assert.assertEquals(token1, secretManagerState.getTokenState());
+ Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
+ Assert.assertEquals(sequenceNumber,
+ secretManagerState.getDTSequenceNumber());
+ }
+
+ private Token<AMRMTokenIdentifier> generateAMRMToken(
+ ApplicationAttemptId attemptId,
+ AMRMTokenSecretManager appTokenMgr) {
+ AMRMTokenIdentifier appTokenId =
+ new AMRMTokenIdentifier(attemptId);
+ Token<AMRMTokenIdentifier> appToken =
+ new Token<AMRMTokenIdentifier>(appTokenId, appTokenMgr);
+ appToken.setService(new Text("appToken service"));
+ return appToken;
+ }
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1533803&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
Sat Oct 19 18:14:14 2013
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestFSRMStateStore extends RMStateStoreTestBase {
+
+ public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
+
+ class TestFSRMStateStoreTester implements RMStateStoreHelper {
+
+ Path workingDirPathURI;
+ FileSystemRMStateStore store;
+ MiniDFSCluster cluster;
+
+ class TestFileSystemRMStore extends FileSystemRMStateStore {
+
+ TestFileSystemRMStore(Configuration conf) throws Exception {
+ init(conf);
+ Assert.assertNull(fs);
+ assertTrue(workingDirPathURI.equals(fsWorkingPath));
+ start();
+ Assert.assertNotNull(fs);
+ }
+ }
+
+ public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
+ Path workingDirPath = new Path("/Test");
+ this.cluster = cluster;
+ FileSystem fs = cluster.getFileSystem();
+ fs.mkdirs(workingDirPath);
+ Path clusterURI = new Path(cluster.getURI());
+ workingDirPathURI = new Path(clusterURI, workingDirPath);
+ fs.close();
+ }
+
+ @Override
+ public RMStateStore getRMStateStore() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+ workingDirPathURI.toString());
+ this.store = new TestFileSystemRMStore(conf);
+ return store;
+ }
+
+ @Override
+ public boolean isFinalStateValid() throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(workingDirPathURI);
+ return files.length == 1;
+ }
+ }
+
+ @Test
+ public void testFSRMStateStore() throws Exception {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ try {
+ TestFSRMStateStoreTester fsTester = new
TestFSRMStateStoreTester(cluster);
+ // If the state store is FileSystemRMStateStore then add corrupted entry.
+ // It should discard the entry and remove it from file system.
+ FSDataOutputStream fsOut = null;
+ FileSystemRMStateStore fileSystemRMStateStore =
+ (FileSystemRMStateStore) fsTester.getRMStateStore();
+ String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
+ ApplicationAttemptId attemptId3 =
+ ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
+ Path rootDir =
+ new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
+ Path appRootDir = new Path(rootDir, "RMAppRoot");
+ Path appDir =
+ new Path(appRootDir, attemptId3.getApplicationId().toString());
+ Path tempAppAttemptFile =
+ new Path(appDir, attemptId3.toString() + ".tmp");
+ fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
+ fsOut.write("Some random data ".getBytes());
+ fsOut.close();
+
+ testRMAppStateStore(fsTester);
+ Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
+ .getFileSystem(conf).exists(tempAppAttemptFile));
+ testRMDTSecretManagerStateStore(fsTester);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1533803&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
(added)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
Sat Oct 19 18:14:14 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Test;
+
+public class TestZKRMStateStore extends RMStateStoreTestBase {
+
+ public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+
+ class TestZKRMStateStoreTester implements RMStateStoreHelper {
+
+ ZooKeeper client;
+ ZKRMStateStore store;
+
+ class TestZKRMStateStoreInternal extends ZKRMStateStore {
+
+ public TestZKRMStateStoreInternal(Configuration conf, String
workingZnode)
+ throws Exception {
+ init(conf);
+ start();
+ assertTrue(znodeWorkingPath.equals(workingZnode));
+ }
+
+ @Override
+ public ZooKeeper getNewZooKeeper() throws IOException {
+ return client;
+ }
+ }
+
+ public RMStateStore getRMStateStore() throws Exception {
+ String workingZnode = "/Test";
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+ conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+ this.client = createClient();
+ this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
+ return this.store;
+ }
+
+ @Override
+ public boolean isFinalStateValid() throws Exception {
+ List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
+ return nodes.size() == 1;
+ }
+ }
+
+ @Test
+ public void testZKRMStateStoreRealZK() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ testRMAppStateStore(zkTester);
+ testRMDTSecretManagerStateStore(zkTester);
+ }
+}
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java?rev=1533803&r1=1533802&r2=1533803&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java
Sat Oct 19 18:14:14 2013
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import
org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
+import
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
@@ -43,17 +43,20 @@ import static org.junit.Assert.fail;
public class TestZKRMStateStoreZKClientConnections extends
ClientBaseWithFixes {
+
private static final int ZK_OP_WAIT_TIME = 3000;
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
class TestZKClient {
+
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
+
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
@@ -87,6 +90,7 @@ public class TestZKRMStateStoreZKClientC
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
+
public void process(WatchedEvent event) {
super.process(event);
try {
@@ -187,7 +191,7 @@ public class TestZKRMStateStoreZKClientC
}
}
- @Test (timeout = 20000)
+ @Test(timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
@@ -196,10 +200,11 @@ public class TestZKRMStateStoreZKClientC
zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1);
fail("Shouldn't be able to delete path");
- } catch (Exception e) {/* expected behavior */}
+ } catch (Exception e) {/* expected behavior */
+ }
}
- @Test (timeout = 20000)
+ @Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();