Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Tue Aug 19 23:49:39 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.PolicyProvider; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; @@ -65,6 +66,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; @@ -79,6 +81,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; @@ -88,6 +91,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; +import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -107,12 +111,15 @@ public class ApplicationMasterService ex new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); private final AllocateResponse resync = recordFactory.newRecordInstance(AllocateResponse.class); + private final AllocateResponse shutdown = + recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; + this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -123,6 +130,7 @@ public class ApplicationMasterService ex YarnRPC rpc = YarnRPC.create(conf); InetSocketAddress masterServiceAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT); @@ -155,7 +163,9 @@ public class ApplicationMasterService ex this.server.start(); this.bindAddress = - conf.updateConnectAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, + conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS, server.getListenerAddress()); super.serviceStart(); } @@ -182,7 +192,7 @@ public class ApplicationMasterService ex return result; } - private ApplicationAttemptId authorizeRequest() + private AMRMTokenIdentifier authorizeRequest() throws YarnException { UserGroupInformation remoteUgi; @@ -219,7 +229,7 @@ public class ApplicationMasterService ex throw RPCUtil.getRemoteException(message); } - return appTokenIdentifier.getApplicationAttemptId(); + return appTokenIdentifier; } @Override @@ -227,7 +237,9 @@ public class ApplicationMasterService ex RegisterApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); ApplicationId appID = applicationAttemptId.getApplicationId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); @@ -298,9 +310,12 @@ public class ApplicationMasterService ex List<NMToken> nmTokens = new ArrayList<NMToken>(); for (Container container : transferredContainers) { try { - nmTokens.add(rmContext.getNMTokenSecretManager() - .createAndGetNMToken(app.getUser(), applicationAttemptId, - container)); + NMToken token = rmContext.getNMTokenSecretManager() + .createAndGetNMToken(app.getUser(), applicationAttemptId, + container); + if (null != token) { + nmTokens.add(token); + } } catch (IllegalArgumentException e) { // if it's a DNS issue, throw UnknowHostException directly and that // will be automatically retried by RMProxy in RPC layer. @@ -323,7 +338,8 @@ public class ApplicationMasterService ex FinishApplicationMasterRequest request) throws YarnException, IOException { - ApplicationAttemptId applicationAttemptId = authorizeRequest(); + ApplicationAttemptId applicationAttemptId = + authorizeRequest().getApplicationAttemptId(); AllocateResponseLock lock = responseMap.get(applicationAttemptId); if (lock == null) { @@ -343,9 +359,9 @@ public class ApplicationMasterService ex AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService", message, applicationAttemptId.getApplicationId(), applicationAttemptId); - throw new InvalidApplicationMasterRequestException(message); + throw new ApplicationMasterNotRegisteredException(message); } - + this.amLivelinessMonitor.receivedPing(applicationAttemptId); RMApp rmApp = @@ -398,7 +414,10 @@ public class ApplicationMasterService ex public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { - ApplicationAttemptId appAttemptId = authorizeRequest(); + AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest(); + + ApplicationAttemptId appAttemptId = + amrmTokenIdentifier.getApplicationAttemptId(); this.amLivelinessMonitor.receivedPing(appAttemptId); @@ -406,22 +425,23 @@ public class ApplicationMasterService ex AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return resync; + return shutdown; } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is trying to allocate before registering for: " - + appAttemptId.getApplicationId(); - LOG.error(message); + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Let AM resync."; + LOG.info(message); RMAuditLogger.logFailure( this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) .getUser(), AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, appAttemptId.getApplicationId(), appAttemptId); - throw new InvalidApplicationMasterRequestException(message); + return resync; } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { @@ -546,6 +566,23 @@ public class ApplicationMasterService ex allocateResponse .setPreemptionMessage(generatePreemptionMessage(allocation)); + // update AMRMToken if the token is rolled-up + MasterKeyData nextMasterKey = + this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); + + if (nextMasterKey != null + && nextMasterKey.getMasterKey().getKeyId() != amrmTokenIdentifier + .getKeyId()) { + Token<AMRMTokenIdentifier> amrmToken = + rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + appAttemptId); + ((RMAppAttemptImpl)appAttempt).setAMRMToken(amrmToken); + allocateResponse.setAMRMToken(org.apache.hadoop.yarn.api.records.Token + .newInstance(amrmToken.getIdentifier(), amrmToken.getKind() + .toString(), amrmToken.getPassword(), amrmToken.getService() + .toString())); + } + /* * As we are updating the response inside the lock object so we don't * need to worry about unregister call occurring in between (which
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Aug 19 23:49:39 2014 @@ -199,7 +199,9 @@ public class ClientRMService extends Abs } this.server.start(); - clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS, + clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADDRESS, server.getListenerAddress()); super.serviceStart(); } @@ -213,7 +215,9 @@ public class ClientRMService extends Abs } InetSocketAddress getBindAddress(Configuration conf) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, + return conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); } @@ -919,7 +923,7 @@ public class ClientRMService extends Abs protoToken.getIdentifier().array(), protoToken.getPassword().array(), new Text(protoToken.getKind()), new Text(protoToken.getService())); - String user = getRenewerForToken(token); + String user = UserGroupInformation.getCurrentUser().getUserName(); rmDTSecretManager.cancelToken(token, user); return Records.newRecord(CancelDelegationTokenResponse.class); } catch (IOException e) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java Tue Aug 19 23:49:39 2014 @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.ActiveStandbyElector; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.ServiceFailedException; @@ -60,7 +61,7 @@ public class EmbeddedElectorService exte } @Override - protected synchronized void serviceInit(Configuration conf) + protected void serviceInit(Configuration conf) throws Exception { conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); @@ -85,8 +86,11 @@ public class EmbeddedElectorService exte List<ACL> zkAcls = RMZKUtils.getZKAcls(conf); List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf); + int maxRetryNum = conf.getInt( + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, - electionZNode, zkAcls, zkAuths, this); + electionZNode, zkAcls, zkAuths, this, maxRetryNum); elector.ensureParentZNode(); if (!isParentZnodeSafe(clusterId)) { @@ -98,20 +102,20 @@ public class EmbeddedElectorService exte } @Override - protected synchronized void serviceStart() throws Exception { + protected void serviceStart() throws Exception { elector.joinElection(localActiveNodeInfo); super.serviceStart(); } @Override - protected synchronized void serviceStop() throws Exception { + protected void serviceStop() throws Exception { elector.quitElection(false); elector.terminateConnection(); super.serviceStop(); } @Override - public synchronized void becomeActive() throws ServiceFailedException { + public void becomeActive() throws ServiceFailedException { try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -120,7 +124,7 @@ public class EmbeddedElectorService exte } @Override - public synchronized void becomeStandby() { + public void becomeStandby() { try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -139,13 +143,13 @@ public class EmbeddedElectorService exte @SuppressWarnings(value = "unchecked") @Override - public synchronized void notifyFatalError(String errorMessage) { + public void notifyFatalError(String errorMessage) { rmContext.getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } @Override - public synchronized void fenceOldActive(byte[] oldActiveData) { + public void fenceOldActive(byte[] oldActiveData) { if (LOG.isDebugEnabled()) { LOG.debug("Request to fence old active being ignored, " + "as embedded leader election doesn't support fencing"); @@ -162,7 +166,7 @@ public class EmbeddedElectorService exte .toByteArray(); } - private synchronized boolean isParentZnodeSafe(String clusterId) + private boolean isParentZnodeSafe(String clusterId) throws InterruptedException, IOException, KeeperException { byte[] data; try { @@ -190,4 +194,9 @@ public class EmbeddedElectorService exte } return true; } + + public void resetLeaderElection() { + elector.quitElection(false); + elector.joinElection(localActiveNodeInfo); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Tue Aug 19 23:49:39 2014 @@ -99,4 +99,8 @@ public interface RMContext { RMApplicationHistoryWriter rmApplicationHistoryWriter); ConfigurationProvider getConfigurationProvider(); + + boolean isWorkPreservingRecoveryEnabled(); + + int getEpoch(); } \ No newline at end of file Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Tue Aug 19 23:49:39 2014 @@ -60,6 +60,7 @@ public class RMContextImpl implements RM = new ConcurrentHashMap<String, RMNode>(); private boolean isHAEnabled; + private boolean isWorkPreservingRecoveryEnabled; private HAServiceState haServiceState = HAServiceProtocol.HAServiceState.INITIALIZING; @@ -81,6 +82,7 @@ public class RMContextImpl implements RM private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private int epoch; /** * Default constructor. To be used in conjunction with setter methods for @@ -329,6 +331,15 @@ public class RMContextImpl implements RM } } + public void setWorkPreservingRecoveryEnabled(boolean enabled) { + this.isWorkPreservingRecoveryEnabled = enabled; + } + + @Override + public boolean isWorkPreservingRecoveryEnabled() { + return this.isWorkPreservingRecoveryEnabled; + } + @Override public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { return rmApplicationHistoryWriter; @@ -349,4 +360,13 @@ public class RMContextImpl implements RM ConfigurationProvider configurationProvider) { this.configurationProvider = configurationProvider; } + + @Override + public int getEpoch() { + return this.epoch; + } + + void setEpoch(int epoch) { + this.epoch = epoch; + } } \ No newline at end of file Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMSecretManagerService.java Tue Aug 19 23:49:39 2014 @@ -60,7 +60,7 @@ public class RMSecretManagerService exte clientToAMSecretManager = createClientToAMTokenSecretManager(); rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager); - amRmTokenSecretManager = createAMRMTokenSecretManager(conf); + amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext); rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager); rmDTSecretManager = @@ -115,8 +115,8 @@ public class RMSecretManagerService exte } protected AMRMTokenSecretManager createAMRMTokenSecretManager( - Configuration conf) { - return new AMRMTokenSecretManager(conf); + Configuration conf, RMContext rmContext) { + return new AMRMTokenSecretManager(conf, rmContext); } protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java Tue Aug 19 23:49:39 2014 @@ -28,6 +28,7 @@ import org.apache.hadoop.security.Access import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Utility methods to aid serving RM data through the REST and RPC APIs @@ -225,4 +228,13 @@ public class RMServerUtils { } } + /** + * Statically defined dummy ApplicationResourceUsageREport. Used as + * a return value when a valid report cannot be found. + */ + public static final ApplicationResourceUsageReport + DUMMY_APPLICATION_RESOURCE_USAGE_REPORT = + BuilderUtils.newApplicationResourceUsageReport(-1, -1, + Resources.createResource(-1, -1), Resources.createResource(-1, -1), + Resources.createResource(-1, -1)); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -32,11 +33,14 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.http.lib.StaticUserWebFilter; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; @@ -88,8 +92,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMAuthenticationHandler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter; +import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer; import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher; import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils; import org.apache.hadoop.yarn.server.webproxy.WebAppProxy; @@ -150,7 +157,8 @@ public class ResourceManager extends Com private AppReportFetcher fetcher = null; protected ResourceTrackerService resourceTracker; - private String webAppAddress; + @VisibleForTesting + protected String webAppAddress; private ConfigurationProvider configurationProvider = null; /** End of Active services */ @@ -225,7 +233,9 @@ public class ResourceManager extends Com } createAndInitActiveServices(); - webAppAddress = WebAppUtils.getRMWebAppURLWithoutScheme(this.conf); + webAppAddress = WebAppUtils.getWebAppBindURL(this.conf, + YarnConfiguration.RM_BIND_HOST, + WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); this.rmLoginUGI = UserGroupInformation.getCurrentUser(); @@ -327,7 +337,7 @@ public class ResourceManager extends Com * RMActiveServices handles all the Active services in the RM. */ @Private - class RMActiveServices extends CompositeService { + public class RMActiveServices extends CompositeService { private DelegationTokenRenewer delegationTokenRenewer; private EventHandler<SchedulerEvent> schedulerDispatcher; @@ -364,9 +374,15 @@ public class ResourceManager extends Com YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null; - if(isRecoveryEnabled) { + if (isRecoveryEnabled) { recoveryEnabled = true; - rmStore = RMStateStoreFactory.getStore(conf); + rmStore = RMStateStoreFactory.getStore(conf); + boolean isWorkPreservingRecoveryEnabled = + conf.getBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); + rmContext + .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { recoveryEnabled = false; rmStore = new NullRMStateStore(); @@ -401,6 +417,8 @@ public class ResourceManager extends Com // Initialize the scheduler scheduler = createScheduler(); + scheduler.setRMContext(rmContext); + addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); @@ -429,12 +447,6 @@ public class ResourceManager extends Com DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - // creating monitors that handle preemption createPolicyMonitors(); @@ -451,7 +463,6 @@ public class ResourceManager extends Com rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); clientRM = createClientRMService(); - rmContext.setClientRMService(clientRM); addService(clientRM); rmContext.setClientRMService(clientRM); @@ -480,6 +491,9 @@ public class ResourceManager extends Com if(recoveryEnabled) { try { rmStore.checkVersion(); + if (rmContext.isWorkPreservingRecoveryEnabled()) { + rmContext.setEpoch(rmStore.getAndIncrementEpoch()); + } RMState state = rmStore.loadState(); recover(state); } catch (Exception e) { @@ -524,11 +538,9 @@ public class ResourceManager extends Com (PreemptableResourceScheduler) scheduler)); for (SchedulingEditPolicy policy : policies) { LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); - policy.init(conf, rmContext.getDispatcher().getEventHandler(), - (PreemptableResourceScheduler) scheduler); // periodically check whether we need to take action to guarantee // constraints - SchedulingMonitor mon = new SchedulingMonitor(policy); + SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); addService(mon); } } else { @@ -664,6 +676,7 @@ public class ResourceManager extends Com // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); rm.transitionToStandby(true); + rm.adminService.resetLeaderElection(); return; } catch (Exception e) { LOG.fatal("Failed to transition RM to Standby mode."); @@ -785,6 +798,88 @@ public class ResourceManager extends Com } protected void startWepApp() { + + // Use the customized yarn filter instead of the standard kerberos filter to + // allow users to authenticate using delegation tokens + // 4 conditions need to be satisfied - + // 1. security is enabled + // 2. http auth type is set to kerberos + // 3. "yarn.resourcemanager.webapp.use-yarn-filter" override is set to true + // 4. hadoop.http.filter.initializers container AuthenticationFilterInitializer + + Configuration conf = getConfig(); + boolean useYarnAuthenticationFilter = + conf.getBoolean( + YarnConfiguration.RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER, + YarnConfiguration.DEFAULT_RM_WEBAPP_DELEGATION_TOKEN_AUTH_FILTER); + String authPrefix = "hadoop.http.authentication."; + String authTypeKey = authPrefix + "type"; + String filterInitializerConfKey = "hadoop.http.filter.initializers"; + String actualInitializers = ""; + Class<?>[] initializersClasses = + conf.getClasses(filterInitializerConfKey); + + boolean hasHadoopAuthFilterInitializer = false; + boolean hasRMAuthFilterInitializer = false; + if (initializersClasses != null) { + for (Class<?> initializer : initializersClasses) { + if (initializer.getName().equals( + AuthenticationFilterInitializer.class.getName())) { + hasHadoopAuthFilterInitializer = true; + } + if (initializer.getName().equals( + RMAuthenticationFilterInitializer.class.getName())) { + hasRMAuthFilterInitializer = true; + } + } + if (UserGroupInformation.isSecurityEnabled() + && useYarnAuthenticationFilter + && hasHadoopAuthFilterInitializer + && conf.get(authTypeKey, "").equals( + KerberosAuthenticationHandler.TYPE)) { + ArrayList<String> target = new ArrayList<String>(); + for (Class<?> filterInitializer : initializersClasses) { + if (filterInitializer.getName().equals( + AuthenticationFilterInitializer.class.getName())) { + if (hasRMAuthFilterInitializer == false) { + target.add(RMAuthenticationFilterInitializer.class.getName()); + } + continue; + } + target.add(filterInitializer.getName()); + } + actualInitializers = StringUtils.join(",", target); + + LOG.info("Using RM authentication filter(kerberos/delegation-token)" + + " for RM webapp authentication"); + RMAuthenticationHandler + .setSecretManager(getClientRMService().rmDTSecretManager); + String yarnAuthKey = + authPrefix + RMAuthenticationFilter.AUTH_HANDLER_PROPERTY; + conf.setStrings(yarnAuthKey, RMAuthenticationHandler.class.getName()); + conf.set(filterInitializerConfKey, actualInitializers); + } + } + + // if security is not enabled and the default filter initializer has not + // been set, set the initializer to include the + // RMAuthenticationFilterInitializer which in turn will set up the simple + // auth filter. + + String initializers = conf.get(filterInitializerConfKey); + if (!UserGroupInformation.isSecurityEnabled()) { + if (initializersClasses == null || initializersClasses.length == 0) { + conf.set(filterInitializerConfKey, + RMAuthenticationFilterInitializer.class.getName()); + conf.set(authTypeKey, "simple"); + } else if (initializers.equals(StaticUserWebFilter.class.getName())) { + conf.set(filterInitializerConfKey, + RMAuthenticationFilterInitializer.class.getName() + "," + + initializers); + conf.set(authTypeKey, "simple"); + } + } + Builder<ApplicationMasterService> builder = WebApps .$for("cluster", ApplicationMasterService.class, masterService, @@ -1022,6 +1117,9 @@ public class ResourceManager extends Com // recover RMdelegationTokenSecretManager rmContext.getRMDelegationTokenSecretManager().recover(state); + // recover AMRMTokenSecretManager + rmContext.getAMRMTokenSecretManager().recover(state); + // recover applications rmAppManager.recover(state); } @@ -1031,12 +1129,17 @@ public class ResourceManager extends Com StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); try { Configuration conf = new YarnConfiguration(); - ResourceManager resourceManager = new ResourceManager(); - ShutdownHookManager.get().addShutdownHook( - new CompositeServiceShutdownHook(resourceManager), - SHUTDOWN_HOOK_PRIORITY); - resourceManager.init(conf); - resourceManager.start(); + // If -format-state-store, then delete RMStateStore; else startup normally + if (argv.length == 1 && argv[0].equals("-format-state-store")) { + deleteRMStateStore(conf); + } else { + ResourceManager resourceManager = new ResourceManager(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(resourceManager), + SHUTDOWN_HOOK_PRIORITY); + resourceManager.init(conf); + resourceManager.start(); + } } catch (Throwable t) { LOG.fatal("Error starting ResourceManager", t); System.exit(-1); @@ -1058,6 +1161,9 @@ public class ResourceManager extends Com ((Service)dispatcher).init(this.conf); ((Service)dispatcher).start(); removeService((Service)rmDispatcher); + // Need to stop previous rmDispatcher before assigning new dispatcher + // otherwise causes "AsyncDispatcher event handler" thread leak + ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); rmContext.setDispatcher(rmDispatcher); @@ -1073,4 +1179,23 @@ public class ResourceManager extends Com return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, YarnConfiguration.DEFAULT_RM_PORT); } + + /** + * Deletes the RMStateStore + * + * @param conf + * @throws Exception + */ + private static void deleteRMStateStore(Configuration conf) throws Exception { + RMStateStore rmStore = RMStateStoreFactory.getStore(conf); + rmStore.init(conf); + rmStore.start(); + try { + LOG.info("Deleting ResourceManager state store..."); + rmStore.deleteStore(); + LOG.info("State store deleted"); + } finally { + rmStore.stop(); + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Aug 19 23:49:39 2014 @@ -32,7 +32,6 @@ import org.apache.hadoop.service.Abstrac import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.factories. import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -120,6 +121,7 @@ public class ResourceTrackerService exte @Override protected void serviceInit(Configuration conf) throws Exception { resourceTrackerAddress = conf.getSocketAddr( + YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); @@ -174,9 +176,11 @@ public class ResourceTrackerService exte } refreshServiceAcls(conf, RMPolicyProvider.getInstance()); } - + this.server.start(); - conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST, + YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, server.getListenerAddress()); } @@ -195,7 +199,7 @@ public class ResourceTrackerService exte */ @SuppressWarnings("unchecked") @VisibleForTesting - void handleContainerStatus(ContainerStatus containerStatus) { + void handleNMContainerStatus(NMContainerStatus containerStatus) { ApplicationAttemptId appAttemptId = containerStatus.getContainerId().getApplicationAttemptId(); RMApp rmApp = @@ -219,11 +223,14 @@ public class ResourceTrackerService exte RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); Container masterContainer = rmAppAttempt.getMasterContainer(); if (masterContainer.getId().equals(containerStatus.getContainerId()) - && containerStatus.getState() == ContainerState.COMPLETE) { + && containerStatus.getContainerState() == ContainerState.COMPLETE) { + ContainerStatus status = + ContainerStatus.newInstance(containerStatus.getContainerId(), + containerStatus.getContainerState(), containerStatus.getDiagnostics(), + containerStatus.getContainerExitStatus()); // sending master container finished event. RMAppAttemptContainerFinishedEvent evt = - new RMAppAttemptContainerFinishedEvent(appAttemptId, - containerStatus); + new RMAppAttemptContainerFinishedEvent(appAttemptId, status); rmContext.getDispatcher().getEventHandler().handle(evt); } } @@ -240,13 +247,6 @@ public class ResourceTrackerService exte Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -305,17 +305,31 @@ public class ResourceTrackerService exte RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), + request.getRunningApplications())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeReconnectEvent(nodeId, rmNode)); + new RMNodeReconnectEvent(nodeId, rmNode, + request.getRunningApplications())); } // On every node manager register we will be clearing NMToken keys if // present for any running application. this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmLivelinessMonitor.register(nodeId); + + // Handle received container status, this should be processed after new + // RMNode inserted + if (!rmContext.isWorkPreservingRecoveryEnabled()) { + if (!request.getNMContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getNMContainerStatuses()); + for (NMContainerStatus status : request.getNMContainerStatuses()) { + handleNMContainerStatus(status); + } + } + } String message = "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ahs/RMApplicationHistoryWriter.java Tue Aug 19 23:49:39 2014 @@ -269,7 +269,7 @@ public class RMApplicationHistoryWriter new WritingContainerStartEvent(container.getContainerId(), ContainerStartData.newInstance(container.getContainerId(), container.getAllocatedResource(), container.getAllocatedNode(), - container.getAllocatedPriority(), container.getStartTime()))); + container.getAllocatedPriority(), container.getCreationTime()))); } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java Tue Aug 19 23:49:39 2014 @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -226,7 +227,7 @@ public class AMLauncher implements Runna } // Add AMRMToken - Token<AMRMTokenIdentifier> amrmToken = getAMRMToken(); + Token<AMRMTokenIdentifier> amrmToken = createAndSetAMRMToken(); if (amrmToken != null) { credentials.addToken(amrmToken.getService(), amrmToken); } @@ -236,8 +237,12 @@ public class AMLauncher implements Runna } @VisibleForTesting - protected Token<AMRMTokenIdentifier> getAMRMToken() { - return application.getAMRMToken(); + protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() { + Token<AMRMTokenIdentifier> amrmToken = + this.rmContext.getAMRMTokenSecretManager().createAndGetAMRMToken( + application.getAppAttemptId()); + ((RMAppAttemptImpl)application).setAMRMToken(amrmToken); + return amrmToken; } @SuppressWarnings("unchecked") Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java Tue Aug 19 23:49:39 2014 @@ -21,6 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import com.google.common.annotations.VisibleForTesting; @@ -34,18 +36,29 @@ public class SchedulingMonitor extends A private Thread checkerThread; private volatile boolean stopped; private long monitorInterval; + private RMContext rmContext; - public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) { + public SchedulingMonitor(RMContext rmContext, + SchedulingEditPolicy scheduleEditPolicy) { super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")"); this.scheduleEditPolicy = scheduleEditPolicy; - this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + this.rmContext = rmContext; } public long getMonitorInterval() { return monitorInterval; } + + @VisibleForTesting + public synchronized SchedulingEditPolicy getSchedulingEditPolicy() { + return scheduleEditPolicy; + } + @SuppressWarnings("unchecked") public void serviceInit(Configuration conf) throws Exception { + scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(), + (PreemptableResourceScheduler) rmContext.getScheduler()); + this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -110,7 +111,7 @@ public class ProportionalCapacityPreempt public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; - //the dispatcher to send preempt and kill events + // the dispatcher to send preempt and kill events public EventHandler<ContainerPreemptEvent> dispatcher; private final Clock clock; @@ -164,12 +165,17 @@ public class ProportionalCapacityPreempt observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); } + + @VisibleForTesting + public ResourceCalculator getResourceCalculator() { + return rc; + } @Override public void editSchedule(){ CSQueue root = scheduler.getRootQueue(); Resource clusterResources = - Resources.clone(scheduler.getClusterResources()); + Resources.clone(scheduler.getClusterResource()); containerBasedPreemptOrKill(root, clusterResources); } @@ -202,7 +208,9 @@ public class ProportionalCapacityPreempt Map<ApplicationAttemptId,Set<RMContainer>> toPreempt = getContainersToPreempt(queues, clusterResources); - logToCSV(queues); + if (LOG.isDebugEnabled()) { + logToCSV(queues); + } // if we are in observeOnly mode return before any action is taken if (observeOnly) { @@ -293,34 +301,31 @@ public class ProportionalCapacityPreempt // with the total capacity for this set of queues Resource unassigned = Resources.clone(tot_guarant); - //assign all cluster resources until no more demand, or no resources are left - while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, - unassigned, Resources.none())) { - Resource wQassigned = Resource.newInstance(0, 0); - - // we compute normalizedGuarantees capacity based on currently active - // queues - resetCapacity(rc, unassigned, qAlloc); - - // offer for each queue their capacity first and in following invocations - // their share of over-capacity - for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) { - TempQueue sub = i.next(); - Resource wQavail = - Resources.multiply(unassigned, sub.normalizedGuarantee); - Resource wQidle = sub.offer(wQavail, rc, tot_guarant); - Resource wQdone = Resources.subtract(wQavail, wQidle); - // if the queue returned a value > 0 it means it is fully satisfied - // and it is removed from the list of active queues qAlloc - if (!Resources.greaterThan(rc, tot_guarant, - wQdone, Resources.none())) { - i.remove(); - } - Resources.addTo(wQassigned, wQdone); + // group queues based on whether they have non-zero guaranteed capacity + Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>(); + Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>(); + + for (TempQueue q : qAlloc) { + if (Resources + .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) { + nonZeroGuarQueues.add(q); + } else { + zeroGuarQueues.add(q); } - Resources.subtractFrom(unassigned, wQassigned); } + // first compute the allocation as a fixpoint based on guaranteed capacity + computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, + false); + + // if any capacity is left unassigned, distributed among zero-guarantee + // queues uniformly (i.e., not based on guaranteed capacity, as this is zero) + if (!zeroGuarQueues.isEmpty() + && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) { + computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, + true); + } + // based on ideal assignment computed above and current assignment we derive // how much preemption is required overall Resource totPreemptionNeeded = Resource.newInstance(0, 0); @@ -353,6 +358,46 @@ public class ProportionalCapacityPreempt } } + + /** + * Given a set of queues compute the fix-point distribution of unassigned + * resources among them. As pending request of a queue are exhausted, the + * queue is removed from the set and remaining capacity redistributed among + * remaining queues. The distribution is weighted based on guaranteed + * capacity, unless asked to ignoreGuarantee, in which case resources are + * distributed uniformly. + */ + private void computeFixpointAllocation(ResourceCalculator rc, + Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, + boolean ignoreGuarantee) { + //assign all cluster resources until no more demand, or no resources are left + while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, + unassigned, Resources.none())) { + Resource wQassigned = Resource.newInstance(0, 0); + + // we compute normalizedGuarantees capacity based on currently active + // queues + resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee); + + // offer for each queue their capacity first and in following invocations + // their share of over-capacity + for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) { + TempQueue sub = i.next(); + Resource wQavail = + Resources.multiply(unassigned, sub.normalizedGuarantee); + Resource wQidle = sub.offer(wQavail, rc, tot_guarant); + Resource wQdone = Resources.subtract(wQavail, wQidle); + // if the queue returned a value > 0 it means it is fully satisfied + // and it is removed from the list of active queues qAlloc + if (!Resources.greaterThan(rc, tot_guarant, + wQdone, Resources.none())) { + i.remove(); + } + Resources.addTo(wQassigned, wQdone); + } + Resources.subtractFrom(unassigned, wQassigned); + } + } /** * Computes a normalizedGuaranteed capacity based on active queues @@ -361,14 +406,21 @@ public class ProportionalCapacityPreempt * @param queues the list of queues to consider */ private void resetCapacity(ResourceCalculator rc, Resource clusterResource, - List<TempQueue> queues) { + Collection<TempQueue> queues, boolean ignoreGuar) { Resource activeCap = Resource.newInstance(0, 0); - for (TempQueue q : queues) { - Resources.addTo(activeCap, q.guaranteed); - } - for (TempQueue q : queues) { - q.normalizedGuarantee = Resources.divide(rc, clusterResource, - q.guaranteed, activeCap); + + if (ignoreGuar) { + for (TempQueue q : queues) { + q.normalizedGuarantee = (float) 1.0f / ((float) queues.size()); + } + } else { + for (TempQueue q : queues) { + Resources.addTo(activeCap, q.guaranteed); + } + for (TempQueue q : queues) { + q.normalizedGuarantee = Resources.divide(rc, clusterResource, + q.guaranteed, activeCap); + } } } @@ -385,8 +437,9 @@ public class ProportionalCapacityPreempt private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt( List<TempQueue> queues, Resource clusterResource) { - Map<ApplicationAttemptId,Set<RMContainer>> list = + Map<ApplicationAttemptId,Set<RMContainer>> preemptMap = new HashMap<ApplicationAttemptId,Set<RMContainer>>(); + List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>(); for (TempQueue qT : queues) { // we act only if we are violating balance by more than @@ -397,26 +450,83 @@ public class ProportionalCapacityPreempt // accounts for natural termination of containers Resource resToObtain = Resources.multiply(qT.toBePreempted, naturalTerminationFactor); + Resource skippedAMSize = Resource.newInstance(0, 0); // lock the leafqueue while we scan applications and unreserve - synchronized(qT.leafQueue) { - NavigableSet<FiCaSchedulerApp> ns = - (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications(); + synchronized (qT.leafQueue) { + NavigableSet<FiCaSchedulerApp> ns = + (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications(); Iterator<FiCaSchedulerApp> desc = ns.descendingIterator(); qT.actuallyPreempted = Resources.clone(resToObtain); while (desc.hasNext()) { FiCaSchedulerApp fc = desc.next(); - if (Resources.lessThanOrEqual(rc, clusterResource, - resToObtain, Resources.none())) { + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { break; } - list.put(fc.getApplicationAttemptId(), - preemptFrom(fc, clusterResource, resToObtain)); + preemptMap.put( + fc.getApplicationAttemptId(), + preemptFrom(fc, clusterResource, resToObtain, + skippedAMContainerlist, skippedAMSize)); } + Resource maxAMCapacityForThisQueue = Resources.multiply( + Resources.multiply(clusterResource, + qT.leafQueue.getAbsoluteCapacity()), + qT.leafQueue.getMaxAMResourcePerQueuePercent()); + + // Can try preempting AMContainers (still saving atmost + // maxAMCapacityForThisQueue AMResource's) if more resources are + // required to be preempted from this Queue. + preemptAMContainers(clusterResource, preemptMap, + skippedAMContainerlist, resToObtain, skippedAMSize, + maxAMCapacityForThisQueue); } } } - return list; + return preemptMap; + } + + /** + * As more resources are needed for preemption, saved AMContainers has to be + * rescanned. Such AMContainers can be preempted based on resToObtain, but + * maxAMCapacityForThisQueue resources will be still retained. + * + * @param clusterResource + * @param preemptMap + * @param skippedAMContainerlist + * @param resToObtain + * @param skippedAMSize + * @param maxAMCapacityForThisQueue + */ + private void preemptAMContainers(Resource clusterResource, + Map<ApplicationAttemptId, Set<RMContainer>> preemptMap, + List<RMContainer> skippedAMContainerlist, Resource resToObtain, + Resource skippedAMSize, Resource maxAMCapacityForThisQueue) { + for (RMContainer c : skippedAMContainerlist) { + // Got required amount of resources for preemption, can stop now + if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain, + Resources.none())) { + break; + } + // Once skippedAMSize reaches down to maxAMCapacityForThisQueue, + // container selection iteration for preemption will be stopped. + if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize, + maxAMCapacityForThisQueue)) { + break; + } + Set<RMContainer> contToPrempt = preemptMap.get(c + .getApplicationAttemptId()); + if (null == contToPrempt) { + contToPrempt = new HashSet<RMContainer>(); + preemptMap.put(c.getApplicationAttemptId(), contToPrempt); + } + contToPrempt.add(c); + + Resources.subtractFrom(resToObtain, c.getContainer().getResource()); + Resources.subtractFrom(skippedAMSize, c.getContainer() + .getResource()); + } + skippedAMContainerlist.clear(); } /** @@ -428,8 +538,9 @@ public class ProportionalCapacityPreempt * @param rsrcPreempt * @return */ - private Set<RMContainer> preemptFrom( - FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) { + private Set<RMContainer> preemptFrom(FiCaSchedulerApp app, + Resource clusterResource, Resource rsrcPreempt, + List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) { Set<RMContainer> ret = new HashSet<RMContainer>(); ApplicationAttemptId appId = app.getApplicationAttemptId(); @@ -461,6 +572,12 @@ public class ProportionalCapacityPreempt rsrcPreempt, Resources.none())) { return ret; } + // Skip AM Container from preemption for now. + if (c.isAMContainer()) { + skippedAMContainerlist.add(c); + Resources.addTo(skippedAMSize, c.getContainer().getResource()); + continue; + } ret.add(c); Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource()); } @@ -515,18 +632,25 @@ public class ProportionalCapacityPreempt private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { TempQueue ret; synchronized (root) { - float absUsed = root.getAbsoluteUsedCapacity(); + String queueName = root.getQueueName(); + float absUsed = root.getAbsoluteUsedCapacity(); + float absCap = root.getAbsoluteCapacity(); + float absMaxCap = root.getAbsoluteMaximumCapacity(); + Resource current = Resources.multiply(clusterResources, absUsed); - Resource guaranteed = - Resources.multiply(clusterResources, root.getAbsoluteCapacity()); + Resource guaranteed = Resources.multiply(clusterResources, absCap); + Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); if (root instanceof LeafQueue) { LeafQueue l = (LeafQueue) root; Resource pending = l.getTotalResourcePending(); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + ret = new TempQueue(queueName, current, pending, guaranteed, + maxCapacity); + ret.setLeafQueue(l); } else { Resource pending = Resource.newInstance(0, 0); - ret = new TempQueue(root.getQueueName(), current, pending, guaranteed); + ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, + maxCapacity); for (CSQueue c : root.getChildQueues()) { ret.addChild(cloneQueues(c, clusterResources)); } @@ -551,7 +675,7 @@ public class ProportionalCapacityPreempt sb.append(", "); tq.appendLogString(sb); } - LOG.info(sb.toString()); + LOG.debug(sb.toString()); } /** @@ -563,6 +687,7 @@ public class ProportionalCapacityPreempt final Resource current; final Resource pending; final Resource guaranteed; + final Resource maxCapacity; Resource idealAssigned; Resource toBePreempted; Resource actuallyPreempted; @@ -573,11 +698,12 @@ public class ProportionalCapacityPreempt LeafQueue leafQueue; TempQueue(String queueName, Resource current, Resource pending, - Resource guaranteed) { + Resource guaranteed, Resource maxCapacity) { this.queueName = queueName; this.current = current; this.pending = pending; this.guaranteed = guaranteed; + this.maxCapacity = maxCapacity; this.idealAssigned = Resource.newInstance(0, 0); this.actuallyPreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0); @@ -614,12 +740,12 @@ public class ProportionalCapacityPreempt // the unused ones Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource) { - // remain = avail - min(avail, current + pending - assigned) - Resource accepted = Resources.min(rc, clusterResource, - avail, - Resources.subtract( - Resources.add(current, pending), - idealAssigned)); + // remain = avail - min(avail, (max - assigned), (current + pending - assigned)) + Resource accepted = + Resources.min(rc, clusterResource, + Resources.subtract(maxCapacity, idealAssigned), + Resources.min(rc, clusterResource, avail, Resources.subtract( + Resources.add(current, pending), idealAssigned))); Resource remain = Resources.subtract(avail, accepted); Resources.addTo(idealAssigned, accepted); return remain; @@ -628,13 +754,15 @@ public class ProportionalCapacityPreempt @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("CUR: ").append(current) + sb.append(" NAME: " + queueName) + .append(" CUR: ").append(current) .append(" PEN: ").append(pending) .append(" GAR: ").append(guaranteed) .append(" NORM: ").append(normalizedGuarantee) .append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_PREEMPT: ").append(toBePreempted) - .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted); + .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) + .append("\n"); return sb.toString(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -43,14 +44,22 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto; -import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import com.google.common.annotations.VisibleForTesting; @@ -62,14 +71,20 @@ import com.google.common.annotations.Vis * FileSystem interface. Does not use directories so that simple key-value * stores can be used. The retry policy for the real filesystem client must be * configured separately to enable retry of filesystem operations when needed. + * + * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved + * separately. The currentMasterkey and nextMasterkey have been stored. + * Also, AMRMToken has been removed from ApplicationAttemptState. */ public class FileSystemRMStateStore extends RMStateStore { public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class); protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; - protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion - .newInstance(1, 0); + protected static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 2); + protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = + "AMRMTokenSecretManagerNode"; protected FileSystem fs; @@ -83,6 +98,7 @@ public class FileSystemRMStateStore exte @VisibleForTesting Path fsWorkingPath; + Path amrmTokenSecretManagerRoot; @Override public synchronized void initInternal(Configuration conf) throws Exception{ @@ -90,6 +106,8 @@ public class FileSystemRMStateStore exte rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME); rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT); rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); + amrmTokenSecretManagerRoot = + new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); } @Override @@ -107,6 +125,7 @@ public class FileSystemRMStateStore exte fs = fsWorkingPath.getFileSystem(conf); fs.mkdirs(rmDTSecretManagerRoot); fs.mkdirs(rmAppRoot); + fs.mkdirs(amrmTokenSecretManagerRoot); } @Override @@ -115,18 +134,18 @@ public class FileSystemRMStateStore exte } @Override - protected RMStateVersion getCurrentVersion() { + protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; } @Override - protected synchronized RMStateVersion loadVersion() throws Exception { + protected synchronized Version loadVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); if (fs.exists(versionNodePath)) { FileStatus status = fs.getFileStatus(versionNodePath); byte[] data = readFile(versionNodePath, status.getLen()); - RMStateVersion version = - new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data)); + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); return version; } return null; @@ -136,14 +155,37 @@ public class FileSystemRMStateStore exte protected synchronized void storeVersion() throws Exception { Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE); byte[] data = - ((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); + ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray(); if (fs.exists(versionNodePath)) { updateFile(versionNodePath, data); } else { writeFile(versionNodePath, data); } } - + + @Override + public synchronized int getAndIncrementEpoch() throws Exception { + Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE); + int currentEpoch = 0; + if (fs.exists(epochNodePath)) { + // load current epoch + FileStatus status = fs.getFileStatus(epochNodePath); + byte[] data = readFile(epochNodePath, status.getLen()); + Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data)); + currentEpoch = epoch.getEpoch(); + // increment epoch and store it + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + updateFile(epochNodePath, storeData); + } else { + // initialize epoch file with 1 for the next time. + byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto() + .toByteArray(); + writeFile(epochNodePath, storeData); + } + return currentEpoch; + } + @Override public synchronized RMState loadState() throws Exception { RMState rmState = new RMState(); @@ -151,9 +193,32 @@ public class FileSystemRMStateStore exte loadRMDTSecretManagerState(rmState); // recover RM applications loadRMAppState(rmState); + // recover AMRMTokenSecretManager + loadAMRMTokenSecretManagerState(rmState); return rmState; } + private void loadAMRMTokenSecretManagerState(RMState rmState) + throws Exception { + checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); + Path amrmTokenSecretManagerStateDataDir = + new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); + FileStatus status; + try { + status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir); + assert status.isFile(); + } catch (FileNotFoundException ex) { + return; + } + byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen()); + AMRMTokenSecretManagerStatePBImpl stateData = + new AMRMTokenSecretManagerStatePBImpl( + AMRMTokenSecretManagerStateProto.parseFrom(data)); + rmState.amrmTokenSecretManagerState = + AMRMTokenSecretManagerState.newInstance( + stateData.getCurrentMasterKey(), stateData.getNextMasterKey()); + } + private void loadRMAppState(RMState rmState) throws Exception { try { List<ApplicationAttemptState> attempts = @@ -214,7 +279,8 @@ public class FileSystemRMStateStore exte attemptStateData.getState(), attemptStateData.getFinalTrackingUrl(), attemptStateData.getDiagnostics(), - attemptStateData.getFinalApplicationStatus()); + attemptStateData.getFinalApplicationStatus(), + attemptStateData.getAMContainerExitStatus()); // assert child node name is same as application attempt id assert attemptId.equals(attemptState.getAttemptId()); @@ -314,7 +380,7 @@ public class FileSystemRMStateStore exte @Override public synchronized void storeApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); fs.mkdirs(appDirPath); @@ -334,7 +400,7 @@ public class FileSystemRMStateStore exte @Override public synchronized void updateApplicationStateInternal(ApplicationId appId, - ApplicationStateDataPBImpl appStateDataPB) throws Exception { + ApplicationStateData appStateDataPB) throws Exception { String appIdStr = appId.toString(); Path appDirPath = getAppDir(rmAppRoot, appIdStr); Path nodeCreatePath = getNodePath(appDirPath, appIdStr); @@ -354,7 +420,7 @@ public class FileSystemRMStateStore exte @Override public synchronized void storeApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -375,7 +441,7 @@ public class FileSystemRMStateStore exte @Override public synchronized void updateApplicationAttemptStateInternal( ApplicationAttemptId appAttemptId, - ApplicationAttemptStateDataPBImpl attemptStateDataPB) + ApplicationAttemptStateData attemptStateDataPB) throws Exception { Path appDirPath = getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString()); @@ -485,6 +551,13 @@ public class FileSystemRMStateStore exte deleteFile(nodeCreatePath); } + @Override + public synchronized void deleteStore() throws IOException { + if (fs.exists(rootDirPath)) { + fs.delete(rootDirPath, true); + } + } + private Path getAppDir(Path root, String appId) { return getNodePath(root, appId); } @@ -560,4 +633,25 @@ public class FileSystemRMStateStore exte return new Path(root, nodeName); } + @Override + public synchronized void storeOrUpdateAMRMTokenSecretManagerState( + AMRMTokenSecretManagerState amrmTokenSecretManagerState, + boolean isUpdate){ + Path nodeCreatePath = + getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE); + AMRMTokenSecretManagerState data = + AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); + byte[] stateData = data.getProto().toByteArray(); + try { + if (isUpdate) { + updateFile(nodeCreatePath, stateData); + } else { + writeFile(nodeCreatePath, stateData); + } + } catch (Exception ex) { + LOG.info("Error storing info for AMRMTokenSecretManager", ex); + notifyStoreOperationFailed(ex); + } + } + }
