Author: bikas
Date: Wed Jul 17 20:19:49 2013
New Revision: 1504261
URL: http://svn.apache.org/r1504261
Log:
YARN-922. Change FileSystemRMStateStore to use directories (Jian He via bikas)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed Jul 17 20:19:49 2013
@@ -500,6 +500,9 @@ Release 2.1.0-beta - 2013-07-02
YARN-927. Change ContainerRequest to not have more than 1 container count
and remove StoreContainerRequest (bikas)
+ YARN-922. Change FileSystemRMStateStore to use directories (Jian He via
+ bikas)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
Wed Jul 17 20:19:49 2013
@@ -111,70 +111,66 @@ public class FileSystemRMStateStore exte
private void loadRMAppState(RMState rmState) throws Exception {
try {
- FileStatus[] childNodes = fs.listStatus(rmAppRoot);
List<ApplicationAttemptState> attempts =
- new ArrayList<ApplicationAttemptState>();
- for(FileStatus childNodeStatus : childNodes) {
- assert childNodeStatus.isFile();
- String childNodeName = childNodeStatus.getPath().getName();
- Path childNodePath = getNodePath(rmAppRoot, childNodeName);
- byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
- if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
- // application
- LOG.info("Loading application from node: " + childNodeName);
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
- ApplicationStateDataPBImpl appStateData =
- new ApplicationStateDataPBImpl(
-
ApplicationStateDataProto.parseFrom(childData));
- ApplicationState appState = new ApplicationState(
- appStateData.getSubmitTime(),
- appStateData.getApplicationSubmissionContext(),
- appStateData.getUser());
- // assert child node name is same as actual applicationId
- assert appId.equals(appState.context.getApplicationId());
- rmState.appState.put(appId, appState);
- } else if(childNodeName.startsWith(
- ApplicationAttemptId.appAttemptIdStrPrefix)) {
- // attempt
- LOG.info("Loading application attempt from node: " + childNodeName);
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(childNodeName);
- ApplicationAttemptStateDataPBImpl attemptStateData =
- new ApplicationAttemptStateDataPBImpl(
+ new ArrayList<ApplicationAttemptState>();
+
+ for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+ for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ byte[] childData =
+ readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+ // application
+ LOG.info("Loading application from node: " + childNodeName);
+ ApplicationId appId =
ConverterUtils.toApplicationId(childNodeName);
+ ApplicationStateDataPBImpl appStateData =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(childData));
+ ApplicationState appState =
+ new ApplicationState(appStateData.getSubmitTime(),
+ appStateData.getApplicationSubmissionContext(),
+ appStateData.getUser());
+ // assert child node name is same as actual applicationId
+ assert appId.equals(appState.context.getApplicationId());
+ rmState.appState.put(appId, appState);
+ } else if (childNodeName
+ .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ // attempt
+ LOG.info("Loading application attempt from node: " +
childNodeName);
+ ApplicationAttemptId attemptId =
+ ConverterUtils.toApplicationAttemptId(childNodeName);
+ ApplicationAttemptStateDataPBImpl attemptStateData =
+ new ApplicationAttemptStateDataPBImpl(
ApplicationAttemptStateDataProto.parseFrom(childData));
- Credentials credentials = null;
- if(attemptStateData.getAppAttemptTokens() != null){
- credentials = new Credentials();
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(attemptStateData.getAppAttemptTokens());
- credentials.readTokenStorageStream(dibb);
+ Credentials credentials = null;
+ if (attemptStateData.getAppAttemptTokens() != null) {
+ credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(attemptStateData.getAppAttemptTokens());
+ credentials.readTokenStorageStream(dibb);
+ }
+ ApplicationAttemptState attemptState =
+ new ApplicationAttemptState(attemptId,
+ attemptStateData.getMasterContainer(), credentials);
+
+ // assert child node name is same as application attempt id
+ assert attemptId.equals(attemptState.getAttemptId());
+ attempts.add(attemptState);
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
}
- ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials);
-
- // assert child node name is same as application attempt id
- assert attemptId.equals(attemptState.getAttemptId());
- attempts.add(attemptState);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
}
}
- // go through all attempts and add them to their apps
- for(ApplicationAttemptState attemptState : attempts) {
+ // go through all attempts and add them to their apps, Ideally, each
+ // attempt node must have a corresponding app node, because remove
+ // directory operation remove both at the same time
+ for (ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
ApplicationState appState = rmState.appState.get(appId);
- if(appState != null) {
- appState.attempts.put(attemptState.getAttemptId(), attemptState);
- } else {
- // the application node may have been removed when the application
- // completed but the RM might have stopped before it could remove the
- // application attempt nodes
- LOG.info("Application node not found for attempt: "
- + attemptState.getAttemptId());
- deleteFile(getNodePath(rmAppRoot,
attemptState.getAttemptId().toString()));
- }
+ assert appState != null;
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
}
} catch (Exception e) {
LOG.error("Failed to load state.", e);
@@ -188,6 +184,12 @@ public class FileSystemRMStateStore exte
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
+ if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+ rmState.rmSecretManagerState.dtSequenceNumber =
+ Integer.parseInt(childNodeName.split("_")[1]);
+ continue;
+ }
+
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
ByteArrayInputStream is = new ByteArrayInputStream(childData);
@@ -202,10 +204,7 @@ public class FileSystemRMStateStore exte
long renewDate = fsIn.readLong();
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
renewDate);
- } else
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
- rmState.rmSecretManagerState.dtSequenceNumber =
- Integer.parseInt(childNodeName.split("_")[1]);
- }else {
+ } else {
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
}
fsIn.close();
@@ -215,7 +214,9 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationState(String appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- Path nodeCreatePath = getNodePath(rmAppRoot, appId);
+ Path appDirPath = getAppDir(rmAppRoot, appId);
+ fs.mkdirs(appDirPath);
+ Path nodeCreatePath = getNodePath(appDirPath, appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -232,7 +233,11 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
- Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ ApplicationAttemptId appAttemptId =
+ ConverterUtils.toApplicationAttemptId(attemptId);
+ Path appDirPath =
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
+ Path nodeCreatePath = getNodePath(appDirPath, attemptId);
LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -250,20 +255,9 @@ public class FileSystemRMStateStore exte
public synchronized void removeApplicationState(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
- Path nodeRemovePath = getNodePath(rmAppRoot, appId);
+ Path nodeRemovePath = getAppDir(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
- for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- removeApplicationAttemptState(attemptId.toString());
- }
- }
-
- public synchronized void removeApplicationAttemptState(String attemptId)
- throws Exception {
- Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
- LOG.info("Removing info for attempt: " + attemptId
- + " at: " + nodeRemovePath);
- deleteFile(nodeRemovePath);
}
@Override
@@ -329,6 +323,10 @@ public class FileSystemRMStateStore exte
deleteFile(nodeCreatePath);
}
+ private Path getAppDir(Path root, String appId) {
+ return getNodePath(root, appId);
+ }
+
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1504261&r1=1504260&r2=1504261&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Wed Jul 17 20:19:49 2013
@@ -105,8 +105,6 @@ public class TestRMStateStore {
interface RMStateStoreHelper {
RMStateStore getRMStateStore() throws Exception;
- void addOrphanAttemptIfNeeded(RMStateStore testStore,
- TestDispatcher dispatcher) throws Exception;
boolean isFinalStateValid() throws Exception;
}
@@ -154,15 +152,6 @@ public class TestRMStateStore {
}
@Override
- public void addOrphanAttemptIfNeeded(RMStateStore testStore,
- TestDispatcher dispatcher) throws Exception {
- ApplicationAttemptId attemptId = ConverterUtils.toApplicationAttemptId(
- "appattempt_1352994193343_0003_000001");
- storeAttempt(testStore, attemptId,
- "container_1352994193343_0003_01_000001", null, null, dispatcher);
- }
-
- @Override
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
@@ -289,9 +278,6 @@ public class TestRMStateStore {
attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp);
- // add orphan attempt file to simulate incomplete removal of app state
- stateStoreHelper.addOrphanAttemptIfNeeded(store, dispatcher);
-
// let things settle down
Thread.sleep(1000);
store.close();
@@ -301,9 +287,6 @@ public class TestRMStateStore {
RMState state = store.loadState();
Map<ApplicationId, ApplicationState> rmAppState =
state.getApplicationState();
- // removed app or orphan attempt is not loaded
- assertEquals(1, rmAppState.size());
-
ApplicationState appState = rmAppState.get(appId1);
// app is loaded
assertNotNull(appState);