Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Aug 20 01:34:29 2014 @@ -32,7 +32,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -45,24 +44,21 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; -import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; @@ -85,6 +81,8 @@ public abstract class RMStateStore exten protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_"; protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX = "RMDTSequenceNumber_"; + protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT = + "AMRMTokenSecretManagerRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; @@ -134,7 +132,8 @@ public abstract class RMStateStore exten LOG.info("Storing info for app: " + appId); try { store.storeApplicationStateInternal(appId, appStateData); - store.notifyDoneStoringApplication(appId, null); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); store.notifyStoreOperationFailed(e); @@ -158,7 +157,8 @@ public abstract class RMStateStore exten LOG.info("Updating info for app: " + appId); try { store.updateApplicationStateInternal(appId, appStateData); - store.notifyDoneUpdatingApplication(appId, null); + store.notifyApplication(new RMAppEvent(appId, + RMAppEventType.APP_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating app: " + appId, e); store.notifyStoreOperationFailed(e); @@ -207,8 +207,9 @@ public abstract class RMStateStore exten } store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - store.notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(), - null); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); store.notifyStoreOperationFailed(e); @@ -235,8 +236,9 @@ public abstract class RMStateStore exten } store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptStateData); - store.notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(), - null); + store.notifyApplicationAttempt(new RMAppAttemptEvent + (attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); store.notifyStoreOperationFailed(e); @@ -412,6 +414,8 @@ public abstract class RMStateStore exten RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState(); + AMRMTokenSecretManagerState amrmTokenSecretManagerState = null; + public Map<ApplicationId, ApplicationState> getApplicationState() { return appState; } @@ -419,6 +423,10 @@ public abstract class RMStateStore exten public RMDTSecretManagerState getRMDTSecretManagerState() { return rmSecretManagerState; } + + public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() { + return amrmTokenSecretManagerState; + } } private Dispatcher rmDispatcher; @@ -487,14 +495,14 @@ public abstract class RMStateStore exten * upgrade RM state. */ public void checkVersion() throws Exception { - RMStateVersion loadedVersion = loadVersion(); + Version loadedVersion = loadVersion(); LOG.info("Loaded RM state version info " + loadedVersion); if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) { return; } // if there is no version info, treat it as 1.0; if (loadedVersion == null) { - loadedVersion = RMStateVersion.newInstance(1, 0); + loadedVersion = Version.newInstance(1, 0); } if (loadedVersion.isCompatibleTo(getCurrentVersion())) { LOG.info("Storing RM state version info " + getCurrentVersion()); @@ -510,7 +518,7 @@ public abstract class RMStateStore exten * Derived class use this method to load the version information from state * store. */ - protected abstract RMStateVersion loadVersion() throws Exception; + protected abstract Version loadVersion() throws Exception; /** * Derived class use this method to store the version information. @@ -520,7 +528,7 @@ public abstract class RMStateStore exten /** * Get the current version of the underlying state store. */ - protected abstract RMStateVersion getCurrentVersion(); + protected abstract Version getCurrentVersion(); /** @@ -714,6 +722,14 @@ public abstract class RMStateStore exten throws Exception; /** + * Blocking API Derived classes must implement this method to store or update + * the state of AMRMToken Master Key + */ + public abstract void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate); + + /** * Non-blocking API * ResourceManager services call this to remove an application from the state * store @@ -755,10 +771,7 @@ public abstract class RMStateStore exten public Credentials getCredentialsFromAppAttempt(RMAppAttempt appAttempt) { Credentials credentials = new Credentials(); - Token<AMRMTokenIdentifier> appToken = appAttempt.getAMRMToken(); - if(appToken != null){ - credentials.addToken(AM_RM_TOKEN_SERVICE, appToken); - } + SecretKey clientTokenMasterKey = appAttempt.getClientTokenMasterKey(); if(clientTokenMasterKey != null){ @@ -792,47 +805,28 @@ public abstract class RMStateStore exten } rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause)); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application that new application is stored in state store - * @param appId id of the application that has been saved - * @param storedException the exception that is thrown when storing the - * application - */ - private void notifyDoneStoringApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppNewSavedEvent(appId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplication(ApplicationId appId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppUpdateSavedEvent(appId, storedException)); + * This method is called to notify the application that + * new application is stored or updated in state store + * @param event App event containing the app id and event type + */ + private void notifyApplication(RMAppEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + @SuppressWarnings("unchecked") /** - * In (@link handleStoreEvent}, this method is called to notify the - * application attempt that new attempt is stored in state store - * @param appAttempt attempt that has been saved - */ - private void notifyDoneStoringApplicationAttempt(ApplicationAttemptId attemptId, - Exception storedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptNewSavedEvent(attemptId, storedException)); - } - - @SuppressWarnings("unchecked") - private void notifyDoneUpdatingApplicationAttempt(ApplicationAttemptId attemptId, - Exception updatedException) { - rmDispatcher.getEventHandler().handle( - new RMAppAttemptUpdateSavedEvent(attemptId, updatedException)); + * This method is called to notify the application attempt + * that new attempt is stored or updated in state store + * @param event App attempt event containing the app attempt + * id and event type + */ + private void notifyApplicationAttempt(RMAppAttemptEvent event) { + rmDispatcher.getEventHandler().handle(event); } - + /** * EventHandler implementation which forward events to the FSRMStateStore * This hides the EventHandle methods of the store from its public interface
Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Aug 20 01:34:29 2014 @@ -44,22 +44,23 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; - import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -77,6 +78,11 @@ import org.apache.zookeeper.server.auth. import com.google.common.annotations.VisibleForTesting; +/** + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. + */ @Private @Unstable public class ZKRMStateStore extends RMStateStore { @@ -85,8 +91,8 @@ public class ZKRMStateStore extends RMSt private final SecureRandom random = new SecureRandom(); protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; - protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 1); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 2); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -128,6 +134,9 @@ public class ZKRMStateStore extends RMSt * | |----- Key_1 * | |----- Key_2 * .... + * |--- AMRMTOKEN_SECRET_MANAGER_ROOT + * |----- currentMasterKey + * |----- nextMasterKey * */ private String zkRootNodePath; @@ -136,6 +145,7 @@ public class ZKRMStateStore extends RMSt private String dtMasterKeysRootPath; private String delegationTokensRootPath; private String dtSequenceNumberPath; + private String amrmTokenSecretManagerRoot; @VisibleForTesting protected String znodeWorkingPath; @@ -255,6 +265,8 @@ public class ZKRMStateStore extends RMSt RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME); dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot, RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); + amrmTokenSecretManagerRoot = + getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); } @Override @@ -275,6 +287,7 @@ public class ZKRMStateStore extends RMSt createRootDir(dtMasterKeysRootPath); createRootDir(delegationTokensRootPath); createRootDir(dtSequenceNumberPath); + createRootDir(amrmTokenSecretManagerRoot); } private void createRootDir(final String rootPath) throws Exception { @@ -369,7 +382,7 @@ public class ZKRMStateStore extends RMSt } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @@ -377,7 +390,7 @@ public class ZKRMStateStore extends RMSt protected synchronized void storeVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); byte[] data = - ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (existsWithRetries(versionNodePath, true) != null) { setDataWithRetries(versionNodePath, data, -1); } else { @@ -386,13 +399,13 @@ public class ZKRMStateStore extends RMSt } @Override - protected synchronized RMStateVersion loadVersion() throws Exception { + protected synchronized Version loadVersion() throws Exception { String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE); if (existsWithRetries(versionNodePath, true) != null) { byte[] data = getDataWithRetries(versionNodePath, true); - RMStateVersion version = - new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); return version; } return null; @@ -427,9 +440,27 @@ public class ZKRMStateStore extends RMSt loadRMDTSecretManagerState(rmState); // recover RM applications loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); return rmState; } + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws Exception { + byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true); + if (data == null) { + LOG.warn("There is no data saved"); + return; + } + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); + + } + private synchronized void loadRMDTSecretManagerState(RMState rmState) throws Exception { loadRMDelegationKeyState(rmState); @@ -1112,4 +1143,19 @@ public class ZKRMStateStore extends RMSt return zk; } + @Override + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate) { + AMRMTokenSecretManagerState data = + AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1); + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Wed Aug 20 01:34:29 2014 @@ -166,6 +166,8 @@ public class RMAppImpl implements RMApp, RMAppEventType.APP_REJECTED, new FinalSavingTransition(new AppRejectedTransition(), RMAppState.FAILED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.MOVE, new RMAppMoveTransition()) // Transitions from SUBMITTED state .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, @@ -243,7 +245,7 @@ public class RMAppImpl implements RMApp, // ignorable transitions .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED)) + RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) // Transitions from FINISHING state .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, @@ -254,9 +256,9 @@ public class RMAppImpl implements RMApp, // ignorable transitions .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, EnumSet.of(RMAppEventType.NODE_UPDATE, - // ignore Kill as we have already saved the final Finished state in - // state store. - RMAppEventType.KILL)) + // ignore Kill/Move as we have already saved the final Finished state + // in state store. + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from KILLING state .addTransition(RMAppState.KILLING, RMAppState.KILLING, @@ -274,7 +276,7 @@ public class RMAppImpl implements RMApp, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FINISHED state // ignorable transitions @@ -286,7 +288,7 @@ public class RMAppImpl implements RMApp, RMAppEventType.NODE_UPDATE, RMAppEventType.ATTEMPT_UNREGISTERED, RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL)) + RMAppEventType.KILL, RMAppEventType.MOVE)) // Transitions from FAILED state // ignorable transitions @@ -294,7 +296,8 @@ public class RMAppImpl implements RMApp, RMAppEventType.APP_RUNNING_ON_NODE, new AppRunningOnNodeTransition()) .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, + RMAppEventType.MOVE)) // Transitions from KILLED state // ignorable transitions @@ -307,7 +310,7 @@ public class RMAppImpl implements RMApp, EnumSet.of(RMAppEventType.APP_ACCEPTED, RMAppEventType.APP_REJECTED, RMAppEventType.KILL, RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE)) + RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) .installTopology(); @@ -820,17 +823,6 @@ public class RMAppImpl implements RMApp, RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - if (event instanceof RMAppNewSavedEvent) { - RMAppNewSavedEvent storeEvent = (RMAppNewSavedEvent) event; - // For HA this exception needs to be handled by giving up - // master status if we got fenced - if (((RMAppNewSavedEvent) event).getStoredException() != null) { - LOG.error( - "Failed to store application: " + storeEvent.getApplicationId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user)); } @@ -848,13 +840,6 @@ public class RMAppImpl implements RMApp, @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - RMAppUpdateSavedEvent storeEvent = (RMAppUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application" - + storeEvent.getApplicationId(), storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, app.eventCausingFinalSaving); @@ -1191,6 +1176,9 @@ public class RMAppImpl implements RMApp, public static boolean isAppInFinalState(RMApp rmApp) { RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); + if (appState == null) { + appState = rmApp.getState(); + } return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Wed Aug 20 01:34:29 2014 @@ -37,6 +37,7 @@ import org.apache.commons.lang.StringUti import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -79,11 +80,9 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptNewSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUpdateSavedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -217,7 +216,13 @@ public class RMAppAttemptImpl implements RMAppAttemptEventType.KILL, new FinalSavingTransition(new BaseFinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) - + .addTransition(RMAppAttemptState.SCHEDULED, + RMAppAttemptState.FINAL_SAVING, + RMAppAttemptEventType.CONTAINER_FINISHED, + new FinalSavingTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.FAILED)) + // Transitions from ALLOCATED_SAVING State .addTransition(RMAppAttemptState.ALLOCATED_SAVING, RMAppAttemptState.ALLOCATED, @@ -391,7 +396,6 @@ public class RMAppAttemptImpl implements RMAppAttemptState.KILLED, RMAppAttemptState.KILLED, EnumSet.of(RMAppAttemptEventType.ATTEMPT_ADDED, - RMAppAttemptEventType.EXPIRE, RMAppAttemptEventType.LAUNCHED, RMAppAttemptEventType.LAUNCH_FAILED, RMAppAttemptEventType.EXPIRE, @@ -553,7 +557,22 @@ public class RMAppAttemptImpl implements @Override public Token<AMRMTokenIdentifier> getAMRMToken() { - return this.amrmToken; + this.readLock.lock(); + try { + return this.amrmToken; + } finally { + this.readLock.unlock(); + } + } + + @Private + public void setAMRMToken(Token<AMRMTokenIdentifier> lastToken) { + this.writeLock.lock(); + try { + this.amrmToken = lastToken; + } finally { + this.writeLock.unlock(); + } } @Override @@ -707,7 +726,8 @@ public class RMAppAttemptImpl implements this.attemptMetrics.setIsPreempted(); } setMasterContainer(attemptState.getMasterContainer()); - recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials()); + recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(), + attemptState.getState()); this.recoveredFinalState = attemptState.getState(); this.originalTrackingUrl = attemptState.getFinalTrackingUrl(); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); @@ -719,9 +739,11 @@ public class RMAppAttemptImpl implements this.justFinishedContainers = attempt.getJustFinishedContainers(); } - private void recoverAppAttemptCredentials(Credentials appAttemptTokens) - throws IOException { - if (appAttemptTokens == null) { + private void recoverAppAttemptCredentials(Credentials appAttemptTokens, + RMAppAttemptState state) throws IOException { + if (appAttemptTokens == null || state == RMAppAttemptState.FAILED + || state == RMAppAttemptState.FINISHED + || state == RMAppAttemptState.KILLED) { return; } @@ -732,12 +754,9 @@ public class RMAppAttemptImpl implements .registerMasterKey(applicationAttemptId, clientTokenMasterKeyBytes); } - // Only one AMRMToken is stored per-attempt, so this should be fine. Can't - // use TokenSelector as service may change - think fail-over. this.amrmToken = - (Token<AMRMTokenIdentifier>) appAttemptTokens - .getToken(RMStateStore.AM_RM_TOKEN_SERVICE); - rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken); + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + applicationAttemptId); } private static class BaseTransition implements @@ -773,11 +792,6 @@ public class RMAppAttemptImpl implements .createMasterKey(appAttempt.applicationAttemptId); } - // create AMRMToken - appAttempt.amrmToken = - appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( - appAttempt.applicationAttemptId); - // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( @@ -889,7 +903,6 @@ public class RMAppAttemptImpl implements @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); appAttempt.launchAttempt(); } } @@ -1041,14 +1054,6 @@ public class RMAppAttemptImpl implements @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptUpdateSavedEvent storeEvent = (RMAppAttemptUpdateSavedEvent) event; - if (storeEvent.getUpdatedException() != null) { - LOG.error("Failed to update the final state of application attempt: " - + storeEvent.getApplicationAttemptId(), - storeEvent.getUpdatedException()); - ExitUtil.terminate(1, storeEvent.getUpdatedException()); - } - RMAppAttemptEvent causeEvent = appAttempt.eventCausingFinalSaving; if (appAttempt.transitionTodo instanceof SingleArcTransition) { @@ -1178,12 +1183,11 @@ public class RMAppAttemptImpl implements @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - appAttempt.checkAttemptStoreError(event); - // TODO Today unmanaged AM client is waiting for app state to be Accepted to - // launch the AM. This is broken since we changed to start the attempt - // after the application is Accepted. We may need to introduce an attempt - // report that client can rely on to query the attempt state and choose to - // launch the unmanaged AM. + // create AMRMToken + appAttempt.amrmToken = + appAttempt.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttempt.applicationAttemptId); + super.transition(appAttempt, event); } } @@ -1671,18 +1675,6 @@ public class RMAppAttemptImpl implements rmContext.getAMLivelinessMonitor().register(getAppAttemptId()); } - private void checkAttemptStoreError(RMAppAttemptEvent event) { - RMAppAttemptNewSavedEvent storeEvent = (RMAppAttemptNewSavedEvent) event; - if(storeEvent.getStoredException() != null) - { - // This needs to be handled for HA and give up master status if we got - // fenced - LOG.error("Failed to store attempt: " + getAppAttemptId(), - storeEvent.getStoredException()); - ExitUtil.terminate(1, storeEvent.getStoredException()); - } - } - private void storeAttempt() { // store attempt data in a non-blocking manner to prevent dispatcher // thread starvation and wait for state to be saved Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Aug 20 01:34:29 2014 @@ -93,9 +93,9 @@ public class RMNodeImpl implements RMNod private final RMContext context; private final String hostName; private final int commandPort; - private final int httpPort; + private int httpPort; private final String nodeAddress; // The containerManager address - private final String httpAddress; + private String httpAddress; private volatile ResourceOption resourceOption; private final Node node; @@ -456,6 +456,24 @@ public class RMNodeImpl implements RMNod } } + private static void handleRunningAppOnNode(RMNodeImpl rmNode, + RMContext context, ApplicationId appId, NodeId nodeId) { + RMApp app = context.getRMApps().get(appId); + + // if we failed getting app by appId, maybe something wrong happened, just + // add the app to the finishedApplications list so that the app can be + // cleaned up on the NM + if (null == app) { + LOG.warn("Cannot get RMApp by appId=" + appId + + ", just added it to finishedApplications list for cleanup"); + rmNode.finishedApplications.add(appId); + return; + } + + context.getDispatcher().getEventHandler() + .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); + } + public static class AddNodeTransition implements SingleArcTransition<RMNodeImpl, RMNodeEvent> { @@ -496,24 +514,6 @@ public class RMNodeImpl implements RMNod new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); } - - void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context, - ApplicationId appId, NodeId nodeId) { - RMApp app = context.getRMApps().get(appId); - - // if we failed getting app by appId, maybe something wrong happened, just - // add the app to the finishedApplications list so that the app can be - // cleaned up on the NM - if (null == app) { - LOG.warn("Cannot get RMApp by appId=" + appId - + ", just added it to finishedApplications list for cleanup"); - rmNode.finishedApplications.add(appId); - return; - } - - context.getDispatcher().getEventHandler() - .handle(new RMAppRunningOnNodeEvent(appId, nodeId)); - } } public static class ReconnectNodeTransition implements @@ -521,36 +521,22 @@ public class RMNodeImpl implements RMNod @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeRemovedSchedulerEvent(rmNode)); - - RMNode newNode = ((RMNodeReconnectEvent)event).getReconnectedNode(); + RMNodeReconnectEvent reconnectEvent = (RMNodeReconnectEvent) event; + RMNode newNode = reconnectEvent.getReconnectedNode(); rmNode.nodeManagerVersion = newNode.getNodeManagerVersion(); - if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) - && rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - if (rmNode.getState() != NodeState.UNHEALTHY) { - // Only add new node if old state is not UNHEALTHY - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - } - } else { - // Reconnected node differs, so replace old node and start new node - switch (rmNode.getState()) { - case RUNNING: - ClusterMetrics.getMetrics().decrNumActiveNodes(); - break; - case UNHEALTHY: - ClusterMetrics.getMetrics().decrNumUnhealthyNMs(); - break; + rmNode.httpPort = newNode.getHttpPort(); + rmNode.httpAddress = newNode.getHttpAddress(); + rmNode.resourceOption = newNode.getResourceOption(); + + // Reset heartbeat ID since node just restarted. + rmNode.getLastNodeHeartBeatResponse().setResponseId(0); + + if (null != reconnectEvent.getRunningApplications()) { + for (ApplicationId appId : reconnectEvent.getRunningApplications()) { + handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } - rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); - rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(newNode.getNodeID(), null, null)); } + rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java Wed Aug 20 01:34:29 2014 @@ -18,17 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; public class RMNodeReconnectEvent extends RMNodeEvent { private RMNode reconnectedNode; + private List<ApplicationId> runningApplications; - public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode) { + public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode, + List<ApplicationId> runningApps) { super(nodeId, RMNodeEventType.RECONNECTED); reconnectedNode = newNode; + runningApplications = runningApps; } public RMNode getReconnectedNode() { return reconnectedNode; } + + public List<ApplicationId> getRunningApplications() { + return runningApplications; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java Wed Aug 20 01:34:29 2014 @@ -18,14 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -33,21 +38,34 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.util.concurrent.SettableFuture; + + @SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler <T extends SchedulerApplicationAttempt, N extends SchedulerNode> @@ -66,6 +84,7 @@ public abstract class AbstractYarnSchedu protected RMContext rmContext; protected Map<ApplicationId, SchedulerApplication<T>> applications; + protected int nmExpireInterval; protected final static List<Container> EMPTY_CONTAINER_LIST = new ArrayList<Container>(); @@ -81,6 +100,15 @@ public abstract class AbstractYarnSchedu super(name); } + @Override + public void serviceInit(Configuration conf) throws Exception { + nmExpireInterval = + conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + createReleaseCache(); + super.serviceInit(conf); + } + public synchronized List<Container> getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); @@ -275,6 +303,19 @@ public abstract class AbstractYarnSchedu ((RMContainerImpl)rmContainer).setAMContainer(true); } } + + synchronized (schedulerAttempt) { + Set<ContainerId> releases = schedulerAttempt.getPendingRelease(); + if (releases.contains(container.getContainerId())) { + // release the container + rmContainer.handle(new RMContainerFinishedEvent(container + .getContainerId(), SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED)); + releases.remove(container.getContainerId()); + LOG.info(container.getContainerId() + " is released by application."); + } + } } } @@ -314,7 +355,109 @@ public abstract class AbstractYarnSchedu } } + protected void createReleaseCache() { + // Cleanup the cache after nm expire interval. + new Timer().schedule(new TimerTask() { + @Override + public void run() { + for (SchedulerApplication<T> app : applications.values()) { + + T attempt = app.getCurrentAppAttempt(); + synchronized (attempt) { + for (ContainerId containerId : attempt.getPendingRelease()) { + RMAuditLogger.logFailure( + app.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", + "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + attempt.getPendingRelease().clear(); + } + } + LOG.info("Release request cache is cleaned up"); + } + }, nmExpireInterval); + } + + // clean up a completed container + protected abstract void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event); + + protected void releaseContainers(List<ContainerId> containers, + SchedulerApplicationAttempt attempt) { + for (ContainerId containerId : containers) { + RMContainer rmContainer = getRMContainer(containerId); + if (rmContainer == null) { + if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() + < nmExpireInterval) { + LOG.info(containerId + " doesn't exist. Add the container" + + " to the release request cache as it maybe on recovery."); + synchronized (attempt) { + attempt.getPendingRelease().add(containerId); + } + } else { + RMAuditLogger.logFailure(attempt.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "Scheduler", + "Trying to release container not owned by app or with invalid id.", + attempt.getApplicationId(), containerId); + } + } + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus(containerId, + SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); + } + } + public SchedulerNode getSchedulerNode(NodeId nodeId) { return nodes.get(nodeId); } + + @Override + public synchronized void moveAllApps(String sourceQueue, String destQueue) + throws YarnException { + // check if destination queue is a valid leaf queue + try { + getQueueInfo(destQueue, false, false); + } catch (IOException e) { + LOG.warn(e); + throw new YarnException(e); + } + // check if source queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(sourceQueue); + if (apps == null) { + String errMsg = "The specified Queue: " + sourceQueue + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate move events for each pending/running app + for (ApplicationAttemptId app : apps) { + SettableFuture<Object> future = SettableFuture.create(); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppMoveEvent(app.getApplicationId(), destQueue, future)); + } + } + + @Override + public synchronized void killAllAppsInQueue(String queueName) + throws YarnException { + // check if queue is a valid + List<ApplicationAttemptId> apps = getAppsInQueue(queueName); + if (apps == null) { + String errMsg = "The specified Queue: " + queueName + " doesn't exist"; + LOG.warn(errMsg); + throw new YarnException(errMsg); + } + // generate kill events for each pending/running app + for (ApplicationAttemptId app : apps) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL)); + } + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Wed Aug 20 01:34:29 2014 @@ -54,7 +54,7 @@ public class AppSchedulingInfo { private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class); private final ApplicationAttemptId applicationAttemptId; final ApplicationId applicationId; - private final String queueName; + private String queueName; Queue queue; final String user; // TODO making containerIdCounter long @@ -360,7 +360,7 @@ public class AppSchedulingInfo { List<ResourceRequest> resourceRequests) { // Update future requirements decrementOutstanding(offSwitchRequest); - // Update cloned RackLocal and OffRack requests for recovery + // Update cloned OffRack requests for recovery resourceRequests.add(cloneResourceRequest(offSwitchRequest)); } @@ -410,6 +410,7 @@ public class AppSchedulingInfo { activeUsersManager = newQueue.getActiveUsersManager(); activeUsersManager.activateApplication(user, applicationId); this.queue = newQueue; + this.queueName = newQueue.getQueueName(); } synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) { Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java Wed Aug 20 01:34:29 2014 @@ -17,13 +17,14 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; @@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt protected List<RMContainer> newlyAllocatedContainers = new ArrayList<RMContainer>(); + // This pendingRelease is used in work-preserving recovery scenario to keep + // track of the AM's outstanding release requests. RM on recovery could + // receive the release request form AM before it receives the container status + // from NM for recovery. In this case, the to-be-recovered containers reported + // by NM should not be recovered. + private Set<ContainerId> pendingRelease = null; + /** * Count how many times the application has been given an opportunity * to schedule a task at each priority. Each time the scheduler @@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt new AppSchedulingInfo(applicationAttemptId, user, queue, activeUsersManager, rmContext.getEpoch()); this.queue = queue; - + this.pendingRelease = new HashSet<ContainerId>(); if (rmContext.getRMApps() != null && rmContext.getRMApps() .containsKey(applicationAttemptId.getApplicationId())) { @@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt return appSchedulingInfo.getResourceRequests(priority); } + public Set<ContainerId> getPendingRelease() { + return this.pendingRelease; + } + public int getNewContainerId() { return appSchedulingInfo.getNewContainerId(); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java Wed Aug 20 01:34:29 2014 @@ -153,14 +153,17 @@ public class SchedulerUtils { * @param rmNode RMNode with new resource view * @param clusterResource the cluster's resource that need to update * @param log Scheduler's log for resource change + * @return true if the resources have changed */ - public static void updateResourceIfChanged(SchedulerNode node, + public static boolean updateResourceIfChanged(SchedulerNode node, RMNode rmNode, Resource clusterResource, Log log) { + boolean result = false; Resource oldAvailableResource = node.getAvailableResource(); Resource newAvailableResource = Resources.subtract( rmNode.getTotalCapability(), node.getUsedResource()); if (!newAvailableResource.equals(oldAvailableResource)) { + result = true; Resource deltaResource = Resources.subtract(newAvailableResource, oldAvailableResource); // Reflect resource change to scheduler node. @@ -176,6 +179,8 @@ public class SchedulerUtils { + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " + deltaResource.getMemory() +"MB"); } + + return result; } /** Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java Wed Aug 20 01:34:29 2014 @@ -202,4 +202,22 @@ public interface YarnScheduler extends E @Evolving public String moveApplication(ApplicationId appId, String newQueue) throws YarnException; + + /** + * Completely drain sourceQueue of applications, by moving all of them to + * destQueue. + * + * @param sourceQueue + * @param destQueue + * @throws YarnException + */ + void moveAllApps(String sourceQueue, String destQueue) throws YarnException; + + /** + * Terminate all applications in the specified queue. + * + * @param queueName the name of queue to be drained + * @throws YarnException + */ + void killAllAppsInQueue(String queueName) throws YarnException; } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java Wed Aug 20 01:34:29 2014 @@ -238,4 +238,22 @@ extends org.apache.hadoop.yarn.server.re * @param apps the collection to add the applications to */ public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps); + + /** + * Detach a container from this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to detach + */ + public void detachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); + + /** + * Attach a container to this queue + * @param clusterResource the current cluster resource + * @param application application to which the container was assigned + * @param container the container to attach + */ + public void attachContainer(Resource clusterResource, + FiCaSchedulerApp application, RMContainer container); } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java Wed Aug 20 01:34:29 2014 @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -24,6 +27,8 @@ import org.apache.hadoop.yarn.util.resou class CSQueueUtils { + private static final Log LOG = LogFactory.getLog(CSQueueUtils.class); + final static float EPSILON = 0.0001f; public static void checkMaxCapacity(String queueName, @@ -113,4 +118,52 @@ class CSQueueUtils { ) ); } + + public static float getAbsoluteMaxAvailCapacity( + ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) { + CSQueue parent = queue.getParent(); + if (parent == null) { + return queue.getAbsoluteMaximumCapacity(); + } + + //Get my parent's max avail, needed to determine my own + float parentMaxAvail = getAbsoluteMaxAvailCapacity( + resourceCalculator, clusterResource, parent); + //...and as a resource + Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail); + + //check for no resources parent before dividing, if so, max avail is none + if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) { + return 0.0f; + } + //sibling used is parent used - my used... + float siblingUsedCapacity = Resources.ratio( + resourceCalculator, + Resources.subtract(parent.getUsedResources(), queue.getUsedResources()), + parentResource); + //my max avail is the lesser of my max capacity and what is unused from my parent + //by my siblings (if they are beyond their base capacity) + float maxAvail = Math.min( + queue.getMaximumCapacity(), + 1.0f - siblingUsedCapacity); + //and, mutiply by parent to get absolute (cluster relative) value + float absoluteMaxAvail = maxAvail * parentMaxAvail; + + if (LOG.isDebugEnabled()) { + LOG.debug("qpath " + queue.getQueuePath()); + LOG.debug("parentMaxAvail " + parentMaxAvail); + LOG.debug("siblingUsedCapacity " + siblingUsedCapacity); + LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity()); + LOG.debug("maxAvail " + maxAvail); + LOG.debug("absoluteMaxAvail " + absoluteMaxAvail); + } + + if (absoluteMaxAvail < 0.0f) { + absoluteMaxAvail = 0.0f; + } else if (absoluteMaxAvail > 1.0f) { + absoluteMaxAvail = 1.0f; + } + + return absoluteMaxAvail; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Wed Aug 20 01:34:29 2014 @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import com.google.common.base.Preconditions; - import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -41,6 +39,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -53,15 +52,11 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -76,6 +71,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -93,6 +90,7 @@ import org.apache.hadoop.yarn.util.resou import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; @LimitedPrivate("yarn") @Evolving @@ -198,6 +196,16 @@ public class CapacityScheduler extends + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; + private boolean overrideWithQueueMappings = false; + private List<QueueMapping> mappings = null; + private Groups groups; + + @VisibleForTesting + public synchronized String getMappedQueueForTest(String user) + throws IOException { + return getMappedQueue(user); + } + public CapacityScheduler() { super(CapacityScheduler.class.getName()); } @@ -262,7 +270,6 @@ public class CapacityScheduler extends this.applications = new ConcurrentHashMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>(); - initializeQueues(this.conf); scheduleAsynchronously = this.conf.getScheduleAynschronously(); @@ -401,7 +408,32 @@ public class CapacityScheduler extends } } private static final QueueHook noop = new QueueHook(); - + + private void initializeQueueMappings() throws IOException { + overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info("Initialized queue mappings, override: " + + overrideWithQueueMappings); + // Get new user/group mappings + List<QueueMapping> newMappings = conf.getQueueMappings(); + //check if mappings refer to valid queues + for (QueueMapping mapping : newMappings) { + if (!mapping.queue.equals(CURRENT_USER_MAPPING) && + !mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + CSQueue queue = queues.get(mapping.queue); + if (queue == null || !(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.queue); + } + } + } + //apply the new mappings since they are valid + mappings = newMappings; + // initialize groups if mappings are present + if (mappings.size() > 0) { + groups = new Groups(conf); + } + } + @Lock(CapacityScheduler.class) private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { @@ -409,7 +441,9 @@ public class CapacityScheduler extends root = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, noop); + LOG.info("Initialized root queue " + root); + initializeQueueMappings(); } @Lock(CapacityScheduler.class) @@ -429,6 +463,7 @@ public class CapacityScheduler extends // Re-configure queues root.reinitialize(newRoot, clusterResource); + initializeQueueMappings(); } /** @@ -516,12 +551,73 @@ public class CapacityScheduler extends } synchronized CSQueue getQueue(String queueName) { + if (queueName == null) { + return null; + } return queues.get(queueName); } + private static final String CURRENT_USER_MAPPING = "%user"; + + private static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + private String getMappedQueue(String user) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.type == MappingType.USER) { + if (mapping.source.equals(CURRENT_USER_MAPPING)) { + if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + return user; + } + else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { + return groups.getGroups(user).get(0); + } + else { + return mapping.queue; + } + } + if (user.equals(mapping.source)) { + return mapping.queue; + } + } + if (mapping.type == MappingType.GROUP) { + for (String userGroups : groups.getGroups(user)) { + if (userGroups.equals(mapping.source)) { + return mapping.queue; + } + } + } + } + return null; + } + private synchronized void addApplication(ApplicationId applicationId, - String queueName, String user, boolean isAppRecovering) { - // santiy checks. + String queueName, String user, boolean isAppRecovering) { + + if (mappings != null && mappings.size() > 0) { + try { + String mappedQueue = getMappedQueue(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + || overrideWithQueueMappings) { + LOG.info("Application " + applicationId + " user " + user + + " mapping [" + queueName + "] to [" + mappedQueue + + "] override " + overrideWithQueueMappings); + queueName = mappedQueue; + RMApp rmApp = rmContext.getRMApps().get(applicationId); + rmApp.setQueue(queueName); + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppRejectedEvent(applicationId, message)); + return; + } + } + + // sanity checks. CSQueue queue = getQueue(queueName); if (queue == null) { String message = "Application " + applicationId + @@ -547,6 +643,8 @@ public class CapacityScheduler extends .handle(new RMAppRejectedEvent(applicationId, ace.toString())); return; } + // update the metrics + queue.getMetrics().submitApp(user); SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(queue, user); applications.put(applicationId, application); @@ -689,21 +787,7 @@ public class CapacityScheduler extends getMinimumResourceCapability(), maximumAllocation); // Release containers - for (ContainerId releasedContainerId : release) { - RMContainer rmContainer = getRMContainer(releasedContainerId); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "CapacityScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); - } - completedContainer(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainerId, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -783,7 +867,10 @@ public class CapacityScheduler extends FiCaSchedulerNode node = getNode(nm.getNodeID()); // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); + if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, + LOG)) { + root.updateClusterResource(clusterResource); + } List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); @@ -896,8 +983,8 @@ public class CapacityScheduler extends { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser(), - appAddedEvent.getIsAppRecovering()); + appAddedEvent.getQueue(), + appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -995,7 +1082,8 @@ public class CapacityScheduler extends } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -1128,4 +1216,59 @@ public class CapacityScheduler extends throw new IOException(e); } } + + @Override + public synchronized String moveApplication(ApplicationId appId, + String targetQueueName) throws YarnException { + FiCaSchedulerApp app = + getApplicationAttempt(ApplicationAttemptId.newInstance(appId, 0)); + String sourceQueueName = app.getQueue().getQueueName(); + LeafQueue source = getAndCheckLeafQueue(sourceQueueName); + LeafQueue dest = getAndCheckLeafQueue(targetQueueName); + // Validation check - ACLs, submission limits for user & queue + String user = app.getUser(); + try { + dest.submitApplication(appId, user, targetQueueName); + } catch (AccessControlException e) { + throw new YarnException(e); + } + // Move all live containers + for (RMContainer rmContainer : app.getLiveContainers()) { + source.detachContainer(clusterResource, app, rmContainer); + // attach the Container to another queue + dest.attachContainer(clusterResource, app, rmContainer); + } + // Detach the application.. + source.finishApplicationAttempt(app, sourceQueueName); + source.getParent().finishApplication(appId, app.getUser()); + // Finish app & update metrics + app.move(dest); + // Submit to a new queue + dest.submitApplicationAttempt(app, user); + applications.get(appId).setQueue(dest); + LOG.info("App: " + app.getApplicationId() + " successfully moved from " + + sourceQueueName + " to: " + targetQueueName); + return targetQueueName; + } + + /** + * Check that the String provided in input is the name of an existing, + * LeafQueue, if successful returns the queue. + * + * @param queue + * @return the LeafQueue + * @throws YarnException + */ + private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { + CSQueue ret = this.getQueue(queue); + if (ret == null) { + throw new YarnException("The specified Queue: " + queue + + " doesn't exist"); + } + if (!(ret instanceof LeafQueue)) { + throw new YarnException("The specified Queue: " + queue + + " is not a Leaf Queue. Move is supported only for Leaf Queues."); + } + return (LeafQueue) ret; + } } Modified: hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java Wed Aug 20 01:34:29 2014 @@ -18,8 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -145,6 +144,44 @@ public class CapacitySchedulerConfigurat @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false; + + @Private + public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; + + @Private + public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + + @Private + public static class QueueMapping { + + public enum MappingType { + + USER("u"), + GROUP("g"); + private final String type; + private MappingType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + + }; + + MappingType type; + String source; + String queue; + + public QueueMapping(MappingType type, String source, String queue) { + this.type = type; + this.source = source; + this.queue = queue; + } + } public CapacitySchedulerConfiguration() { this(new Configuration()); @@ -378,4 +415,82 @@ public class CapacitySchedulerConfigurat setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async); } + public boolean getOverrideWithQueueMappings() { + return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, + DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); + } + + /** + * Returns a collection of strings, trimming leading and trailing whitespeace + * on each value + * + * @param str + * String to parse + * @param delim + * delimiter to separate the values + * @return Collection of parsed elements. + */ + private static Collection<String> getTrimmedStringCollection(String str, + String delim) { + List<String> values = new ArrayList<String>(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + + /** + * Get user/group mappings to queues. + * + * @return user/groups mappings or null on illegal configs + */ + public List<QueueMapping> getQueueMappings() { + List<QueueMapping> mappings = + new ArrayList<CapacitySchedulerConfiguration.QueueMapping>(); + Collection<String> mappingsString = + getTrimmedStringCollection(QUEUE_MAPPING); + for (String mappingValue : mappingsString) { + String[] mapping = + getTrimmedStringCollection(mappingValue, ":") + .toArray(new String[] {}); + if (mapping.length != 3 || mapping[1].length() == 0 + || mapping[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + QueueMapping m; + try { + QueueMapping.MappingType mappingType; + if (mapping[0].equals("u")) { + mappingType = QueueMapping.MappingType.USER; + } else if (mapping[0].equals("g")) { + mappingType = QueueMapping.MappingType.GROUP; + } else { + throw new IllegalArgumentException( + "unknown mapping prefix " + mapping[0]); + } + m = new QueueMapping( + mappingType, + mapping[1], + mapping[2]); + } catch (Throwable t) { + throw new IllegalArgumentException( + "Illegal queue mapping " + mappingValue); + } + + if (m != null) { + mappings.add(m); + } + } + + return mappings; + } }
