Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -32,9 +32,10 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import com.google.common.annotations.VisibleForTesting; @@ -43,6 +44,8 @@ import com.google.common.annotations.Vis public class MemoryRMStateStore extends RMStateStore { RMState state = new RMState(); + private int epoch = 0; + @VisibleForTesting public RMState getState() { return state; @@ -53,6 +56,13 @@ public class MemoryRMStateStore extends } @Override + public synchronized int getAndIncrementEpoch() throws Exception { + int currentEpoch = epoch; + epoch = epoch + 1; + return currentEpoch; + } + + @Override public synchronized RMState loadState() throws Exception { // return a copy of the state to allow for modification of the real state RMState returnState = new RMState(); @@ -63,6 +73,10 @@ public class MemoryRMStateStore extends state.rmSecretManagerState.getTokenState()); returnState.rmSecretManagerState.dtSequenceNumber = state.rmSecretManagerState.dtSequenceNumber; + returnState.amrmTokenSecretManagerState = + state.amrmTokenSecretManagerState == null ? null + : AMRMTokenSecretManagerState + .newInstance(state.amrmTokenSecretManagerState); return returnState; } @@ -80,7 +94,7 @@ public class MemoryRMStateStore extends @Override public void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) + ApplicationStateData appStateData) throws Exception { ApplicationState appState = new ApplicationState(appStateData.getSubmitTime(), @@ -92,7 +106,7 @@ public class MemoryRMStateStore extends @Override public void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { ApplicationState updatedAppState = new ApplicationState(appStateData.getSubmitTime(), appStateData.getStartTime(), @@ -112,7 +126,7 @@ public class MemoryRMStateStore extends @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { Credentials credentials = null; if(attemptStateData.getAppAttemptTokens() != null){ @@ -137,7 +151,7 @@ public class MemoryRMStateStore extends @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) + ApplicationAttemptStateData attemptStateData) throws Exception { Credentials credentials = null; if (attemptStateData.getAppAttemptTokens() != null) { @@ -152,7 +166,8 @@ public class MemoryRMStateStore extends attemptStateData.getStartTime(), attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); ApplicationState appState = state.getApplicationState().get( @@ -244,7 +259,7 @@ public class MemoryRMStateStore extends } @Override - protected RMStateVersion loadVersion() throws Exception { + protected Version loadVersion() throws Exception { return null; } @@ -253,8 +268,22 @@ public class MemoryRMStateStore extends } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return null; } + @Override + public void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate) { + if (amrmTokenSecretManagerState != null) { + state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState + .newInstance(amrmTokenSecretManagerState); + } + } + + @Override + public void deleteStore() throws Exception { + } + }
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -25,9 +25,10 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @Unstable public class NullRMStateStore extends RMStateStore { @@ -48,19 +49,24 @@ public class NullRMStateStore extends RM } @Override + public synchronized int getAndIncrementEpoch() throws Exception { + return 0; + } + + @Override public RMState loadState() throws Exception { throw new UnsupportedOperationException("Cannot load state from null store"); } @Override protected void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { // Do nothing } @Override protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { // Do nothing } @@ -102,13 +108,13 @@ public class NullRMStateStore extends RM @Override protected void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception { + ApplicationStateData appStateData) throws Exception { // Do nothing } @Override protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception { + ApplicationAttemptStateData attemptStateData) throws Exception { } @Override @@ -117,7 +123,7 @@ public class NullRMStateStore extends RM } @Override - protected RMStateVersion loadVersion() throws Exception { + protected Version loadVersion() throws Exception { // Do nothing return null; } @@ -128,9 +134,20 @@ public class NullRMStateStore extends RM } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { // Do nothing return null; } + @Override + public void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState state, boolean isUpdate) { + //DO Nothing + } + + @Override + public void deleteStore() throws Exception { + // Do nothing + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -31,36 +30,39 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; +import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.SingleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; @Private @Unstable @@ -79,12 +81,174 @@ public abstract class RMStateStore exten protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; + protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT = + "AMRMTokenSecretManagerRoot"; protected static final String VERSION_NODE = "RMVersionNode"; + protected static final String EPOCH_NODE = "EpochNode"; public static final Log LOG = LogFactory.getLog(RMStateStore.class); + private enum RMStateStoreState { + DEFAULT + }; + + private static final StateMachineFactory<RMStateStore, + RMStateStoreState, + RMStateStoreEventType, + RMStateStoreEvent> + stateMachineFactory = new StateMachineFactory<RMStateStore, + RMStateStoreState, + RMStateStoreEventType, + RMStateStoreEvent>( + RMStateStoreState.DEFAULT) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP, new StoreAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) + .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()); + + private final StateMachine<RMStateStoreState, + RMStateStoreEventType, + RMStateStoreEvent> stateMachine; + + private static class StoreAppTransition + implements SingleArcTransition<RMStateStore, RMStateStoreEvent> { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateData appStateData = ApplicationStateData + .newInstance(appState); + LOG.info("Storing info for app: " + appId); + try { + store.storeApplicationStateInternal(appId, appStateData); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_NEW_SAVED)); + } catch (Exception e) { + LOG.error("Error storing app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppTransition implements + SingleArcTransition<RMStateStore, RMStateStoreEvent> { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateUpdateAppEvent) event).getAppState(); + ApplicationId appId = appState.getAppId(); + ApplicationStateData appStateData = ApplicationStateData + .newInstance(appState); + LOG.info("Updating info for app: " + appId); + try { + store.updateApplicationStateInternal(appId, appStateData); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_UPDATE_SAVED)); + } catch (Exception e) { + LOG.error("Error updating app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class RemoveAppTransition implements + SingleArcTransition<RMStateStore, RMStateStoreEvent> { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreRemoveAppEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationState appState = ((RMStateStoreRemoveAppEvent) event) + .getAppState(); + ApplicationId appId = appState.getAppId(); + LOG.info("Removing info for app: " + appId); + try { + store.removeApplicationStateInternal(appState); + } catch (Exception e) { + LOG.error("Error removing app: " + appId, e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class StoreAppAttemptTransition implements + SingleArcTransition<RMStateStore, RMStateStoreEvent> { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateData attemptStateData = + ApplicationAttemptStateData.newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); + } + store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); + } catch (Exception e) { + LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + store.notifyStoreOperationFailed(e); + } + }; + } + + private static class UpdateAppAttemptTransition implements + SingleArcTransition<RMStateStore, RMStateStoreEvent> { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateUpdateAppAttemptEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + ApplicationAttemptState attemptState = + ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); + try { + ApplicationAttemptStateData attemptStateData = ApplicationAttemptStateData + .newInstance(attemptState); + if (LOG.isDebugEnabled()) { + LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); + } + store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), + attemptStateData); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); + } catch (Exception e) { + LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + store.notifyStoreOperationFailed(e); + } + }; + } + public RMStateStore() { super(RMStateStore.class.getName()); + stateMachine = stateMachineFactory.make(this); } /** @@ -99,19 +263,21 @@ public abstract class RMStateStore exten RMAppAttemptState state; String finalTrackingUrl = "N/A"; String diagnostics; + int exitStatus = ContainerExitStatus.INVALID; FinalApplicationStatus amUnregisteredFinalStatus; public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime) { this(attemptId, masterContainer, appAttemptCredentials, startTime, null, - null, "", null); + null, "", null, ContainerExitStatus.INVALID); } public ApplicationAttemptState(ApplicationAttemptId attemptId, Container masterContainer, Credentials appAttemptCredentials, long startTime, RMAppAttemptState state, String finalTrackingUrl, - String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus) { + String diagnostics, FinalApplicationStatus amUnregisteredFinalStatus, + int exitStatus) { this.attemptId = attemptId; this.masterContainer = masterContainer; this.appAttemptCredentials = appAttemptCredentials; @@ -120,6 +286,7 @@ public abstract class RMStateStore exten this.finalTrackingUrl = finalTrackingUrl; this.diagnostics = diagnostics == null ? "" : diagnostics; this.amUnregisteredFinalStatus = amUnregisteredFinalStatus; + this.exitStatus = exitStatus; } public Container getMasterContainer() { @@ -146,6 +313,9 @@ public abstract class RMStateStore exten public FinalApplicationStatus getFinalApplicationStatus() { return amUnregisteredFinalStatus; } + public int getAMContainerExitStatus(){ + return this.exitStatus; + } } /** @@ -244,6 +414,8 @@ public abstract class RMStateStore exten RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); + AMRMTokenSecretManagerState amrmTokenSecretManagerState = null; + public Map<ApplicationId, ApplicationState> getApplicationState() { return appState; } @@ -251,6 +423,10 @@ public abstract class RMStateStore exten public RMDTSecretManagerState getRMDTSecretManagerState() { return rmSecretManagerState; } + + public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() { + return amrmTokenSecretManagerState; + } } private Dispatcher rmDispatcher; @@ -319,14 +495,14 @@ public abstract class RMStateStore exten * upgrade RM state. */ public void checkVersion() throws Exception { - RMStateVersion loadedVersion = loadVersion(); + Version loadedVersion = loadVersion(); LOG.info("Loaded RM state version info " + loadedVersion); if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { return; } // if there is no version info, treat it as 1.0; if (loadedVersion == null) { - loadedVersion = RMStateVersion.newInstance(1, 0); + loadedVersion = Version.newInstance(1, 0); } if (loadedVersion.isCompatibleTo(getCurrentVersion())) { LOG.info("Storing RM state version info " + getCurrentVersion()); @@ -342,7 +518,7 @@ public abstract class RMStateStore exten * Derived class use this method to load the version information from state * store. */ - protected abstract RMStateVersion loadVersion() throws Exception; + protected abstract Version loadVersion() throws Exception; /** * Derived class use this method to store the version information. @@ -352,8 +528,14 @@ public abstract class RMStateStore exten /** * Get the current version of the underlying state store. */ - protected abstract RMStateVersion getCurrentVersion(); + protected abstract Version getCurrentVersion(); + + /** + * Get the current epoch of RM and increment the value. + */ + public abstract int getAndIncrementEpoch() throws Exception; + /** * Blocking API * The derived class must recover state from the store and return a new @@ -390,10 +572,10 @@ public abstract class RMStateStore exten * application. */ protected abstract void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception; + ApplicationStateData appStateData) throws Exception; protected abstract void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateData) throws Exception; + ApplicationStateData appStateData) throws Exception; @SuppressWarnings("unchecked") /** @@ -428,11 +610,11 @@ public abstract class RMStateStore exten */ protected abstract void storeApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; + ApplicationAttemptStateData attemptStateData) throws Exception; protected abstract void updateApplicationAttemptStateInternal( ApplicationAttemptId attemptId, - ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception; + ApplicationAttemptStateData attemptStateData) throws Exception; /** * RMDTSecretManager call this to store the state of a delegation token @@ -540,6 +722,14 @@ public abstract class RMStateStore exten throws Exception; /** + * Blocking API Derived classes must implement this method to store or update + * the state of AMRMToken Master Key + */ + public abstract void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate); + + /** * Non-blocking API * ResourceManager services call this to remove an application from the state * store @@ -581,10 +771,7 @@ public abstract class RMStateStore exten public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { Credentials credentials = new Credentials(); - Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken(); - if(appToken != null){ - credentials.addToken(AM_RM_TOKEN_SERVICE, appToken); - } + SecretKey clientTokenMasterKey = appAttempt.getClientTokenMasterKey(); if(clientTokenMasterKey != null){ @@ -596,105 +783,10 @@ public abstract class RMStateStore exten // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { - if (event.getType().equals(RMStateStoreEventType.STORE_APP) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) { - ApplicationState appState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - appState = ((RMStateStoreAppEvent) event).getAppState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - appState = ((RMStateUpdateAppEvent) event).getAppState(); - } - - Exception storedException = null; - ApplicationStateDataPBImpl appStateData = - (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl - .newApplicationStateData(appState.getSubmitTime(), - appState.getStartTime(), appState.getUser(), - appState.getApplicationSubmissionContext(), appState.getState(), - appState.getDiagnostics(), appState.getFinishTime()); - - ApplicationId appId = - appState.getApplicationSubmissionContext().getApplicationId(); - - LOG.info("Storing info for app: " + appId); - try { - if (event.getType().equals(RMStateStoreEventType.STORE_APP)) { - storeApplicationStateInternal(appId, appStateData); - notifyDoneStoringApplication(appId, storedException); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP); - updateApplicationStateInternal(appId, appStateData); - notifyDoneUpdatingApplication(appId, storedException); - } - } catch (Exception e) { - LOG.error("Error storing/updating app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT) - || event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT)) { - - ApplicationAttemptState attemptState = null; - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - attemptState = - ((RMStateStoreAppAttemptEvent) event).getAppAttemptState(); - } else { - assert event.getType().equals(RMStateStoreEventType.UPDATE_APP_ATTEMPT); - attemptState = - ((RMStateUpdateAppAttemptEvent) event).getAppAttemptState(); - } - - Exception storedException = null; - Credentials credentials = attemptState.getAppAttemptCredentials(); - ByteBuffer appAttemptTokens = null; - try { - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); - } - ApplicationAttemptStateDataPBImpl attemptStateData = - (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl - .newApplicationAttemptStateData(attemptState.getAttemptId(), - attemptState.getMasterContainer(), appAttemptTokens, - attemptState.getStartTime(), attemptState.getState(), - attemptState.getFinalTrackingUrl(), - attemptState.getDiagnostics(), - attemptState.getFinalApplicationStatus()); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); - } - if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) { - storeApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - storedException); - } else { - assert event.getType().equals( - RMStateStoreEventType.UPDATE_APP_ATTEMPT); - updateApplicationAttemptStateInternal(attemptState.getAttemptId(), - attemptStateData); - notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - storedException); - } - } catch (Exception e) { - LOG.error( - "Error storing/updating appAttempt: " + attemptState.getAttemptId(), e); - notifyStoreOperationFailed(e); - } - } else if (event.getType().equals(RMStateStoreEventType.REMOVE_APP)) { - ApplicationState appState = - ((RMStateStoreRemoveAppEvent) event).getAppState(); - ApplicationId appId = appState.getAppId(); - LOG.info("Removing info for app: " + appId); - try { - removeApplicationStateInternal(appState); - } catch (Exception e) { - LOG.error("Error removing app: " + appId, e); - notifyStoreOperationFailed(e); - } - } else { - LOG.error("Unknown RMStateStoreEvent type: " + event.getType()); + try { + this.stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state", e); } } @@ -713,47 +805,28 @@ public abstract class RMStateStore exten } rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause)); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application that new application is stored in state store - * @param appId id of the application that has been saved - * @param storedException the exception that is thrown when storing the - * application - */ - private void notifyDoneStoringApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppNewSavedEvent(appId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppUpdateSavedEvent(appId, storedException)); + * This method is called to notify the application that + * new application is stored or updated in state store + * @param event App event containing the app id and event type + */ + private void notifyApplication(RMAppEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application attempt that new attempt is stored in state store - * @param appAttempt attempt that has been saved - */ - private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptNewSavedEvent(attemptId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId, - Exception updatedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); + * This method is called to notify the application attempt + * that new attempt is stored or updated in state store + * @param event App attempt event containing the app attempt + * id and event type + */ + private void notifyApplicationAttempt(RMAppAttemptEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface @@ -766,4 +839,10 @@ public abstract class RMStateStore exten handleStoreEvent(event); } } + + /** + * Derived classes must implement this method to delete the state store + * @throws Exception + */ + public abstract void deleteStore() throws Exception; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -44,15 +44,23 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -70,6 +78,11 @@ import org.apache.zookeeper.server.auth. import com.google.common.annotations.VisibleForTesting; +/** + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. + */ @Private @Unstable public class ZKRMStateStore extends RMStateStore { @@ -78,8 +91,8 @@ public class ZKRMStateStore extends RMSt private final SecureRandom random = new SecureRandom(); protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; - protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 0); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 2); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -90,7 +103,9 @@ public class ZKRMStateStore extends RMSt private String zkHostPort = null; private int zkSessionTimeout; - private long zkRetryInterval; + + @VisibleForTesting + long zkRetryInterval; private List<ACL> zkAcl; private List<ZKUtil.ZKAuthInfo> zkAuths; @@ -98,6 +113,7 @@ public class ZKRMStateStore extends RMSt * * ROOT_DIR_PATH * |--- VERSION_INFO + * |--- EPOCH_NODE * |--- RM_ZK_FENCING_LOCK * |--- RM_APP_ROOT * | |----- (#ApplicationId1) @@ -118,6 +134,9 @@ public class ZKRMStateStore extends RMSt * | |----- Key_1 * | |----- Key_2 * .... + * |--- AMRMTOKEN_SECRET_MANAGER_ROOT + * |----- currentMasterKey + * |----- nextMasterKey * */ private String zkRootNodePath; @@ -126,6 +145,7 @@ public class ZKRMStateStore extends RMSt private String dtMasterKeysRootPath; private String delegationTokensRootPath; private String dtSequenceNumberPath; + private String amrmTokenSecretManagerRoot; @VisibleForTesting protected String znodeWorkingPath; @@ -199,9 +219,14 @@ public class ZKRMStateStore extends RMSt zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); - zkRetryInterval = - conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); + + if (HAUtil.isHAEnabled(conf)) { + zkRetryInterval = zkSessionTimeout / numRetries; + } else { + zkRetryInterval = + conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); + } zkAcl = RMZKUtils.getZKAcls(conf); zkAuths = RMZKUtils.getZKAuths(conf); @@ -240,6 +265,8 @@ public class ZKRMStateStore extends RMSt RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); + amrmTokenSecretManagerRoot = + getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); } @Override @@ -260,24 +287,26 @@ public class ZKRMStateStore extends RMSt createRootDir(dtMasterKeysRootPath); createRootDir(delegationTokensRootPath); createRootDir(dtSequenceNumberPath); + createRootDir(amrmTokenSecretManagerRoot); } private void createRootDir(final String rootPath) throws Exception { // For root dirs, we shouldn't use the doMulti helper methods - try { - new ZKAction<String>() { - @Override - public String run() throws KeeperException, InterruptedException { + new ZKAction<String>() { + @Override + public String run() throws KeeperException, InterruptedException { + try { return zkClient.create(rootPath, null, zkAcl, CreateMode.PERSISTENT); + } catch (KeeperException ke) { + if (ke.code() == Code.NODEEXISTS) { + LOG.debug(rootPath + "znode already exists!"); + return null; + } else { + throw ke; + } } - }.runWithRetries(); - } catch (KeeperException ke) { - if (ke.code() == Code.NODEEXISTS) { - LOG.debug(rootPath + "znode already exists!"); - } else { - throw ke; } - } + }.runWithRetries(); } private void logRootNodeAcls(String prefix) throws Exception { @@ -353,7 +382,7 @@ public class ZKRMStateStore extends RMSt } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @@ -361,7 +390,7 @@ public class ZKRMStateStore extends RMSt protected synchronized void storeVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = - ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (existsWithRetries(versionNodePath, true) != null) { setDataWithRetries(versionNodePath, data, -1); } else { @@ -370,28 +399,68 @@ public class ZKRMStateStore extends RMSt } @Override - protected synchronized RMStateVersion loadVersion() throws Exception { + protected synchronized Version loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); if (existsWithRetries(versionNodePath, true) != null) { byte[] data = getDataWithRetries(versionNodePath, true); - RMStateVersion version = - new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); return version; } return null; } @Override + public synchronized int getAndIncrementEpoch() throws Exception { + String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE); + int currentEpoch = 0; + if (existsWithRetries(epochNodePath, true) != null) { + // load current epoch + byte[] data = getDataWithRetries(epochNodePath, true); + Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + setDataWithRetries(epochNodePath, storeData, -1); + } else { + // initialize epoch node with 1 for the next time. + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + createWithRetries(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT); + } + return currentEpoch; + } + + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); // recover DelegationTokenSecretManager loadRMDTSecretManagerState(rmState); // recover RM applications loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); return rmState; } + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws Exception { + byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true); + if (data == null) { + LOG.warn("There is no data saved"); + return; + } + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); + + } + private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { loadRMDelegationKeyState(rmState); @@ -529,22 +598,22 @@ public class ZKRMStateStore extends RMSt ApplicationAttemptState attemptState = new ApplicationAttemptState(attemptId, - attemptStateData.getMasterContainer(), credentials, - attemptStateData.getStartTime(), - attemptStateData.getState(), - attemptStateData.getFinalTrackingUrl(), - attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getMasterContainer(), credentials, + attemptStateData.getStartTime(), attemptStateData.getState(), + attemptStateData.getFinalTrackingUrl(), + attemptStateData.getDiagnostics(), + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); appState.attempts.put(attemptState.getAttemptId(), attemptState); } } - LOG.info("Done Loading applications from ZK state store"); + LOG.debug("Done Loading applications from ZK state store"); } @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String nodeCreatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -558,7 +627,7 @@ public class ZKRMStateStore extends RMSt @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString()); if (LOG.isDebugEnabled()) { @@ -572,7 +641,7 @@ public class ZKRMStateStore extends RMSt } else { createWithRetries(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT); - LOG.info(appId + " znode didn't exist. Created a new znode to" + LOG.debug(appId + " znode didn't exist. Created a new znode to" + " update the application state."); } } @@ -580,7 +649,7 @@ public class ZKRMStateStore extends RMSt @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { String appDirPath = getNodePath(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -598,7 +667,7 @@ public class ZKRMStateStore extends RMSt @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { String appIdStr = appAttemptId.getApplicationId().toString(); String appAttemptIdStr = appAttemptId.toString(); @@ -615,7 +684,7 @@ public class ZKRMStateStore extends RMSt } else { createWithRetries(nodeUpdatePath, attemptStateData, zkAcl, CreateMode.PERSISTENT); - LOG.info(appAttemptId + " znode didn't exist. Created a new znode to" + LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to" + " update the application attempt state."); } } @@ -664,7 +733,7 @@ public class ZKRMStateStore extends RMSt if (existsWithRetries(nodeRemovePath, true) != null) { opList.add(Op.delete(nodeRemovePath, -1)); } else { - LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); } doMultiWithRetries(opList); } @@ -681,7 +750,7 @@ public class ZKRMStateStore extends RMSt // in case znode doesn't exist addStoreOrUpdateOps( opList, rmDTIdentifier, renewDate, latestSequenceNumber, false); - LOG.info("Attempted to update a non-existing znode " + nodeRemovePath); + LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); } else { // in case znode exists addStoreOrUpdateOps( @@ -763,7 +832,14 @@ public class ZKRMStateStore extends RMSt if (existsWithRetries(nodeRemovePath, true) != null) { doMultiWithRetries(Op.delete(nodeRemovePath, -1)); } else { - LOG.info("Attempted to delete a non-existing znode " + nodeRemovePath); + LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); + } + } + + @Override + public synchronized void deleteStore() throws Exception { + if (existsWithRetries(zkRootNodePath, true) != null) { + deleteWithRetries(zkRootNodePath, true); } } @@ -816,7 +892,7 @@ public class ZKRMStateStore extends RMSt case Expired: // the connection got terminated because of session timeout // call listener to reconnect - LOG.info("Session expired"); + LOG.info("ZKRMStateStore Session expired"); createConnection(); break; default: @@ -921,6 +997,29 @@ public class ZKRMStateStore extends RMSt }.runWithRetries(); } + private void deleteWithRetries( + final String path, final boolean watch) throws Exception { + new ZKAction<Void>() { + @Override + Void run() throws KeeperException, InterruptedException { + recursiveDeleteWithRetriesHelper(path, watch); + return null; + } + }.runWithRetries(); + } + + /** + * Helper method that deletes znodes recursively + */ + private void recursiveDeleteWithRetriesHelper(String path, boolean watch) + throws KeeperException, InterruptedException { + List<String> children = zkClient.getChildren(path, watch); + for (String child : children) { + recursiveDeleteWithRetriesHelper(path + "/" + child, false); + } + zkClient.delete(path, -1); + } + /** * Helper class that periodically attempts creating a znode to ensure that * this RM continues to be the Active. @@ -991,13 +1090,13 @@ public class ZKRMStateStore extends RMSt throw new StoreFencedException(); } } catch (KeeperException ke) { + LOG.info("Exception while executing a ZK operation.", ke); if (shouldRetry(ke.code()) && ++retry < numRetries) { - LOG.info("Waiting for zookeeper to be connected, retry no. + " - + retry); + LOG.info("Retrying operation on ZK. Retry no. " + retry); Thread.sleep(zkRetryInterval); continue; } - LOG.debug("Error while doing ZK operation.", ke); + LOG.info("Maxed out ZK retries. Giving up!"); throw ke; } } @@ -1044,4 +1143,19 @@ public class ZKRMStateStore extends RMSt return zk; } + @Override + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate) { + AMRMTokenSecretManagerState data = + AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1); + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java Tue Aug 19 23:49:39 2014 @@ -18,31 +18,74 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.util.Records; /* * Contains the state data that needs to be persisted for an ApplicationAttempt */ @Public @Unstable -public interface ApplicationAttemptStateData { - +public abstract class ApplicationAttemptStateData { + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptId attemptId, Container container, + ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, + String finalTrackingUrl, String diagnostics, + FinalApplicationStatus amUnregisteredFinalStatus, int exitStatus) { + ApplicationAttemptStateData attemptStateData = + Records.newRecord(ApplicationAttemptStateData.class); + attemptStateData.setAttemptId(attemptId); + attemptStateData.setMasterContainer(container); + attemptStateData.setAppAttemptTokens(attemptTokens); + attemptStateData.setState(finalState); + attemptStateData.setFinalTrackingUrl(finalTrackingUrl); + attemptStateData.setDiagnostics(diagnostics); + attemptStateData.setStartTime(startTime); + attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); + attemptStateData.setAMContainerExitStatus(exitStatus); + return attemptStateData; + } + + public static ApplicationAttemptStateData newInstance( + ApplicationAttemptState attemptState) throws IOException { + Credentials credentials = attemptState.getAppAttemptCredentials(); + ByteBuffer appAttemptTokens = null; + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + appAttemptTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + } + return newInstance(attemptState.getAttemptId(), + attemptState.getMasterContainer(), appAttemptTokens, + attemptState.getStartTime(), attemptState.getState(), + attemptState.getFinalTrackingUrl(), attemptState.getDiagnostics(), + attemptState.getFinalApplicationStatus(), + attemptState.getAMContainerExitStatus()); + } + + public abstract ApplicationAttemptStateDataProto getProto(); + /** * The ApplicationAttemptId for the application attempt * @return ApplicationAttemptId for the application attempt */ @Public @Unstable - public ApplicationAttemptId getAttemptId(); + public abstract ApplicationAttemptId getAttemptId(); - public void setAttemptId(ApplicationAttemptId attemptId); + public abstract void setAttemptId(ApplicationAttemptId attemptId); /* * The master container running the application attempt @@ -50,9 +93,9 @@ public interface ApplicationAttemptState */ @Public @Unstable - public Container getMasterContainer(); + public abstract Container getMasterContainer(); - public void setMasterContainer(Container container); + public abstract void setMasterContainer(Container container); /** * The application attempt tokens that belong to this attempt @@ -60,17 +103,17 @@ public interface ApplicationAttemptState */ @Public @Unstable - public ByteBuffer getAppAttemptTokens(); + public abstract ByteBuffer getAppAttemptTokens(); - public void setAppAttemptTokens(ByteBuffer attemptTokens); + public abstract void setAppAttemptTokens(ByteBuffer attemptTokens); /** * Get the final state of the application attempt. * @return the final state of the application attempt. */ - public RMAppAttemptState getState(); + public abstract RMAppAttemptState getState(); - public void setState(RMAppAttemptState state); + public abstract void setState(RMAppAttemptState state); /** * Get the original not-proxied <em>final tracking url</em> for the @@ -79,34 +122,39 @@ public interface ApplicationAttemptState * @return the original not-proxied <em>final tracking url</em> for the * application */ - public String getFinalTrackingUrl(); + public abstract String getFinalTrackingUrl(); /** * Set the final tracking Url of the AM. * @param url */ - public void setFinalTrackingUrl(String url); + public abstract void setFinalTrackingUrl(String url); /** * Get the <em>diagnositic information</em> of the attempt * @return <em>diagnositic information</em> of the attempt */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * Get the <em>start time</em> of the application. * @return <em>start time</em> of the application */ - public long getStartTime(); + public abstract long getStartTime(); - public void setStartTime(long startTime); + public abstract void setStartTime(long startTime); /** * Get the <em>final finish status</em> of the application. * @return <em>final finish status</em> of the application */ - public FinalApplicationStatus getFinalApplicationStatus(); + public abstract FinalApplicationStatus getFinalApplicationStatus(); + + public abstract void setFinalApplicationStatus( + FinalApplicationStatus finishState); + + public abstract int getAMContainerExitStatus(); - public void setFinalApplicationStatus(FinalApplicationStatus finishState); + public abstract void setAMContainerExitStatus(int exitStatus); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java Tue Aug 19 23:49:39 2014 @@ -24,7 +24,10 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.util.Records; /** * Contains all the state data that needs to be stored persistently @@ -32,19 +35,43 @@ import org.apache.hadoop.yarn.server.res */ @Public @Unstable -public interface ApplicationStateData { - +public abstract class ApplicationStateData { + public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, + RMAppState state, String diagnostics, long finishTime) { + ApplicationStateData appState = Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + return appState; + } + + public static ApplicationStateData newInstance( + ApplicationState appState) { + return newInstance(appState.getSubmitTime(), appState.getStartTime(), + appState.getUser(), appState.getApplicationSubmissionContext(), + appState.getState(), appState.getDiagnostics(), + appState.getFinishTime()); + } + + public abstract ApplicationStateDataProto getProto(); + /** * The time at which the application was received by the Resource Manager * @return submitTime */ @Public @Unstable - public long getSubmitTime(); + public abstract long getSubmitTime(); @Public @Unstable - public void setSubmitTime(long submitTime); + public abstract void setSubmitTime(long submitTime); /** * Get the <em>start time</em> of the application. @@ -63,11 +90,11 @@ public interface ApplicationStateData { */ @Public @Unstable - public void setUser(String user); + public abstract void setUser(String user); @Public @Unstable - public String getUser(); + public abstract String getUser(); /** * The {@link ApplicationSubmissionContext} for the application @@ -76,34 +103,34 @@ public interface ApplicationStateData { */ @Public @Unstable - public ApplicationSubmissionContext getApplicationSubmissionContext(); + public abstract ApplicationSubmissionContext getApplicationSubmissionContext(); @Public @Unstable - public void setApplicationSubmissionContext( + public abstract void setApplicationSubmissionContext( ApplicationSubmissionContext context); /** * Get the final state of the application. * @return the final state of the application. */ - public RMAppState getState(); + public abstract RMAppState getState(); - public void setState(RMAppState state); + public abstract void setState(RMAppState state); /** * Get the diagnostics information for the application master. * @return the diagnostics information for the application master. */ - public String getDiagnostics(); + public abstract String getDiagnostics(); - public void setDiagnostics(String diagnostics); + public abstract void setDiagnostics(String diagnostics); /** * The finish time of the application. * @return the finish time of the application., */ - public long getFinishTime(); + public abstract long getFinishTime(); - public void setFinishTime(long finishTime); + public abstract void setFinishTime(long finishTime); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationAttemptStateDataPBImpl.java Tue Aug 19 23:49:39 2014 @@ -25,10 +25,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder; @@ -36,12 +33,10 @@ import org.apache.hadoop.yarn.proto.Yarn import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -public class ApplicationAttemptStateDataPBImpl -extends ProtoBase<ApplicationAttemptStateDataProto> -implements ApplicationAttemptStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationAttemptStateDataPBImpl extends + ApplicationAttemptStateData { ApplicationAttemptStateDataProto proto = ApplicationAttemptStateDataProto.getDefaultInstance(); ApplicationAttemptStateDataProto.Builder builder = null; @@ -60,7 +55,8 @@ implements ApplicationAttemptStateData { this.proto = proto; viaProto = true; } - + + @Override public ApplicationAttemptStateDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -76,7 +72,8 @@ implements ApplicationAttemptStateData { builder.setMasterContainer(((ContainerPBImpl)masterContainer).getProto()); } if(this.appAttemptTokens != null) { - builder.setAppAttemptTokens(convertToProtoFormat(this.appAttemptTokens)); + builder.setAppAttemptTokens(ProtoUtils.convertToProtoFormat( + this.appAttemptTokens)); } } @@ -148,7 +145,8 @@ implements ApplicationAttemptStateData { if(!p.hasAppAttemptTokens()) { return null; } - this.appAttemptTokens = convertFromProtoFormat(p.getAppAttemptTokens()); + this.appAttemptTokens = ProtoUtils.convertFromProtoFormat( + p.getAppAttemptTokens()); return appAttemptTokens; } @@ -249,24 +247,39 @@ implements ApplicationAttemptStateData { builder.setFinalApplicationStatus(convertToProtoFormat(finishState)); } - public static ApplicationAttemptStateData newApplicationAttemptStateData( - ApplicationAttemptId attemptId, Container container, - ByteBuffer attemptTokens, long startTime, RMAppAttemptState finalState, - String finalTrackingUrl, String diagnostics, - FinalApplicationStatus amUnregisteredFinalStatus) { - ApplicationAttemptStateData attemptStateData = - recordFactory.newRecordInstance(ApplicationAttemptStateData.class); - attemptStateData.setAttemptId(attemptId); - attemptStateData.setMasterContainer(container); - attemptStateData.setAppAttemptTokens(attemptTokens); - attemptStateData.setState(finalState); - attemptStateData.setFinalTrackingUrl(finalTrackingUrl); - attemptStateData.setDiagnostics(diagnostics); - attemptStateData.setStartTime(startTime); - attemptStateData.setFinalApplicationStatus(amUnregisteredFinalStatus); - return attemptStateData; + @Override + public int hashCode() { + return getProto().hashCode(); } + @Override + public int getAMContainerExitStatus() { + ApplicationAttemptStateDataProtoOrBuilder p = viaProto ? proto : builder; + return p.getAmContainerExitStatus(); + } + + @Override + public void setAMContainerExitStatus(int exitStatus) { + maybeInitBuilder(); + builder.setAmContainerExitStatus(exitStatus); + } + + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + private static String RM_APP_ATTEMPT_PREFIX = "RMATTEMPT_"; public static RMAppAttemptStateProto convertToProtoFormat(RMAppAttemptState e) { return RMAppAttemptStateProto.valueOf(RM_APP_ATTEMPT_PREFIX + e.name()); @@ -281,5 +294,4 @@ implements ApplicationAttemptStateData { private FinalApplicationStatus convertFromProtoFormat(FinalApplicationStatusProto s) { return ProtoUtils.convertFromProtoFormat(s); } - } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java Tue Aug 19 23:49:39 2014 @@ -20,21 +20,15 @@ package org.apache.hadoop.yarn.server.re import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMAppStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -public class ApplicationStateDataPBImpl -extends ProtoBase<ApplicationStateDataProto> -implements ApplicationStateData { - private static final RecordFactory recordFactory = RecordFactoryProvider - .getRecordFactory(null); +import com.google.protobuf.TextFormat; +public class ApplicationStateDataPBImpl extends ApplicationStateData { ApplicationStateDataProto proto = ApplicationStateDataProto.getDefaultInstance(); ApplicationStateDataProto.Builder builder = null; @@ -51,7 +45,8 @@ implements ApplicationStateData { this.proto = proto; viaProto = true; } - + + @Override public ApplicationStateDataProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -136,7 +131,7 @@ implements ApplicationStateData { } applicationSubmissionContext = new ApplicationSubmissionContextPBImpl( - p.getApplicationSubmissionContext()); + p.getApplicationSubmissionContext()); return applicationSubmissionContext; } @@ -200,21 +195,24 @@ implements ApplicationStateData { builder.setFinishTime(finishTime); } - public static ApplicationStateData newApplicationStateData(long submitTime, - long startTime, String user, - ApplicationSubmissionContext submissionContext, RMAppState state, - String diagnostics, long finishTime) { - - ApplicationStateData appState = - recordFactory.newRecordInstance(ApplicationStateData.class); - appState.setSubmitTime(submitTime); - appState.setStartTime(startTime); - appState.setUser(user); - appState.setApplicationSubmissionContext(submissionContext); - appState.setState(state); - appState.setDiagnostics(diagnostics); - appState.setFinishTime(finishTime); - return appState; + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); } private static String RM_APP_PREFIX = "RMAPP_"; Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Tue Aug 19 23:49:39 2014 @@ -19,16 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.util.Collection; - import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -208,6 +208,14 @@ public interface RMApp extends EventHand * @return the flag indicating whether the applications's state is stored. */ boolean isAppFinalStateStored(); + + + /** + * Nodes on which the containers for this {@link RMApp} ran. + * @return the set of nodes that ran any containers from this {@link RMApp} + * Add more node on which containers for this {@link RMApp} ran + */ + Set<NodeId> getRanNodes(); /** * Create the external user-facing state of ApplicationMaster from the @@ -215,4 +223,11 @@ public interface RMApp extends EventHand * @return the external user-facing state of ApplicationMaster. */ YarnApplicationState createApplicationState(); + + /** + * Get RMAppMetrics of the {@link RMApp}. + * + * @return metrics + */ + RMAppMetrics getRMAppMetrics(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Tue Aug 19 23:49:39 2014 @@ -38,6 +38,9 @@ public enum RMAppEventType { ATTEMPT_FAILED, ATTEMPT_KILLED, NODE_UPDATE, + + // Source: Container and ResourceTracker + APP_RUNNING_ON_NODE, // Source: RMStateStore APP_NEW_SAVED,
