Repository: hadoop Updated Branches: refs/heads/YARN-5355-branch-2 bb4f44038 -> 278ca2f24
YARN-6102. RMActiveService context to be updated with new RMContext on failover. Contributed by Rohith Sharma K S. (cherry picked from commit a657472b42c58f87fd3165e0a746d83b72182a24) Conflicts: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c602f05b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c602f05b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c602f05b Branch: refs/heads/YARN-5355-branch-2 Commit: c602f05b82a4f44448ec11efe42d2d90d3b2cb59 Parents: bb4f440 Author: Sunil G <sun...@apache.org> Authored: Mon Jul 24 20:57:25 2017 +0530 Committer: Varun Saxena <varunsax...@apache.org> Committed: Fri Jul 28 22:48:33 2017 +0530 ---------------------------------------------------------------------- ...ActiveStandbyElectorBasedElectorService.java | 12 +- .../server/resourcemanager/AdminService.java | 71 ++--- .../CuratorBasedElectorService.java | 10 +- .../resourcemanager/RMActiveServiceContext.java | 15 + .../server/resourcemanager/RMContextImpl.java | 294 ++++++++++--------- .../resourcemanager/RMServiceContext.java | 151 ++++++++++ .../server/resourcemanager/ResourceManager.java | 28 +- .../yarn/server/resourcemanager/MockRM.java | 2 +- .../resourcemanager/TestRMEmbeddedElector.java | 8 +- .../yarn/server/resourcemanager/TestRMHA.java | 16 +- 10 files changed, 406 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index 751eedd..7e41399 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -57,7 +57,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); - private RMContext rmContext; + private ResourceManager rm; private byte[] localActiveNodeInfo; private ActiveStandbyElector elector; @@ -66,9 +66,9 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService @VisibleForTesting final Object zkDisconnectLock = new Object(); - ActiveStandbyElectorBasedElectorService(RMContext rmContext) { + ActiveStandbyElectorBasedElectorService(ResourceManager rm) { super(ActiveStandbyElectorBasedElectorService.class.getName()); - this.rmContext = rmContext; + this.rm = rm; } @Override @@ -139,7 +139,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToActive(req); + rm.getRMContext().getRMAdminService().transitionToActive(req); } catch (Exception e) { throw new ServiceFailedException("RM could not transition to Active", e); } @@ -150,7 +150,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService cancelDisconnectTimer(); try { - rmContext.getRMAdminService().transitionToStandby(req); + rm.getRMContext().getRMAdminService().transitionToStandby(req); } catch (Exception e) { LOG.error("RM could not transition to Standby", e); } @@ -204,7 +204,7 @@ public class ActiveStandbyElectorBasedElectorService extends AbstractService @SuppressWarnings(value = "unchecked") @Override public void notifyFatalError(String errorMessage) { - rmContext.getDispatcher().getEventHandler().handle( + rm.getRMContext().getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 74c87a2..afea100 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -102,7 +102,6 @@ public class AdminService extends CompositeService implements private static final Log LOG = LogFactory.getLog(AdminService.class); - private final RMContext rmContext; private final ResourceManager rm; private String rmId; @@ -123,16 +122,16 @@ public class AdminService extends CompositeService implements @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; - public AdminService(ResourceManager rm, RMContext rmContext) { + public AdminService(ResourceManager rm) { super(AdminService.class.getName()); this.rm = rm; - this.rmContext = rmContext; } @Override public void serviceInit(Configuration conf) throws Exception { autoFailoverEnabled = - rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); + rm.getRMContext().isHAEnabled() + && HAUtil.isAutomaticFailoverEnabled(conf); masterServiceBindAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, @@ -189,7 +188,7 @@ public class AdminService extends CompositeService implements RMPolicyProvider.getInstance()); } - if (rmContext.isHAEnabled()) { + if (rm.getRMContext().isHAEnabled()) { RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, ProtobufRpcEngine.class); @@ -265,7 +264,7 @@ public class AdminService extends CompositeService implements } private synchronized boolean isRMActive() { - return HAServiceState.ACTIVE == rmContext.getHAServiceState(); + return HAServiceState.ACTIVE == rm.getRMContext().getHAServiceState(); } private void throwStandbyException() throws StandbyException { @@ -305,7 +304,7 @@ public class AdminService extends CompositeService implements refreshAll(); } catch (Exception e) { LOG.error("RefreshAll failed so firing fatal event", e); - rmContext + rm.getRMContext() .getDispatcher() .getEventHandler() .handle( @@ -364,7 +363,7 @@ public class AdminService extends CompositeService implements @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - HAServiceState haState = rmContext.getHAServiceState(); + HAServiceState haState = rm.getRMContext().getHAServiceState(); HAServiceStatus ret = new HAServiceStatus(haState); if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); @@ -396,11 +395,12 @@ public class AdminService extends CompositeService implements } private void refreshQueues() throws IOException, YarnException { - rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); + rm.getRMContext().getScheduler().reinitialize(getConfig(), + this.rm.getRMContext()); // refresh the reservation system - ReservationSystem rSystem = rmContext.getReservationSystem(); + ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); if (rSystem != null) { - rSystem.reinitialize(getConfig(), rmContext); + rSystem.reinitialize(getConfig(), rm.getRMContext()); } } @@ -419,14 +419,14 @@ public class AdminService extends CompositeService implements YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); switch (request.getDecommissionType()) { case NORMAL: - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); break; case GRACEFUL: - rmContext.getNodesListManager().refreshNodesGracefully( + rm.getRMContext().getNodesListManager().refreshNodesGracefully( conf, request.getDecommissionTimeout()); break; case FORCEFUL: - rmContext.getNodesListManager().refreshNodesForcefully(); + rm.getRMContext().getNodesListManager().refreshNodesForcefully(); break; } RMAuditLogger.logSuccess(user.getShortUserName(), operation, @@ -441,7 +441,7 @@ public class AdminService extends CompositeService implements Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getNodesListManager().refreshNodes(conf); + rm.getRMContext().getNodesListManager().refreshNodes(conf); } @Override @@ -560,10 +560,11 @@ public class AdminService extends CompositeService implements Configuration conf = getConfiguration(new Configuration(false), YarnConfiguration.HADOOP_POLICY_CONFIGURATION_FILE); - rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); - rmContext.getApplicationMasterService().refreshServiceAcls( + rm.getRMContext().getClientRMService().refreshServiceAcls(conf, + policyProvider); + rm.getRMContext().getApplicationMasterService().refreshServiceAcls( conf, policyProvider); - rmContext.getResourceTrackerService().refreshServiceAcls( + rm.getRMContext().getResourceTrackerService().refreshServiceAcls( conf, policyProvider); } @@ -602,7 +603,7 @@ public class AdminService extends CompositeService implements // if any invalid nodes, throw exception instead of partially updating // valid nodes. for (NodeId nodeId : nodeIds) { - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.error("Resource update get failed on all nodes due to change " + "resource on an unrecognized node: " + nodeId); @@ -620,14 +621,14 @@ public class AdminService extends CompositeService implements for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) { ResourceOption newResourceOption = entry.getValue(); NodeId nodeId = entry.getKey(); - RMNode node = this.rmContext.getRMNodes().get(nodeId); + RMNode node = this.rm.getRMContext().getRMNodes().get(nodeId); if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); allSuccess = false; } else { // update resource to RMNode - this.rmContext.getDispatcher().getEventHandler() + this.rm.getRMContext().getDispatcher().getEventHandler() .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); LOG.info("Update resource on node(" + node.getNodeID() + ") with resource(" + newResourceOption.toString() + ")"); @@ -662,7 +663,8 @@ public class AdminService extends CompositeService implements DynamicResourceConfiguration newConf; InputStream drInputStream = - this.rmContext.getConfigurationProvider().getConfigurationInputStream( + this.rm.getRMContext().getConfigurationProvider() + .getConfigurationInputStream( configuration, YarnConfiguration.DR_CONFIGURATION_FILE); if (drInputStream != null) { @@ -680,7 +682,7 @@ public class AdminService extends CompositeService implements updateNodeResource(updateRequest); } // refresh dynamic resource in ResourceTrackerService - this.rmContext.getResourceTrackerService(). + this.rm.getRMContext().getResourceTrackerService(). updateDynamicResourceConfiguration(newConf); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -693,7 +695,8 @@ public class AdminService extends CompositeService implements private synchronized Configuration getConfiguration(Configuration conf, String... confFileNames) throws YarnException, IOException { for (String confFileName : confFileNames) { - InputStream confFileInputStream = this.rmContext.getConfigurationProvider() + InputStream confFileInputStream = + this.rm.getRMContext().getConfigurationProvider() .getConfigurationInputStream(conf, confFileName); if (confFileInputStream != null) { conf.addResource(confFileInputStream); @@ -747,7 +750,7 @@ public class AdminService extends CompositeService implements AddToClusterNodeLabelsResponse response = recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager() + rm.getRMContext().getNodeLabelManager() .addToCluserNodeLabels(request.getNodeLabels()); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -770,7 +773,8 @@ public class AdminService extends CompositeService implements RemoveFromClusterNodeLabelsResponse response = recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class); try { - rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels()); + rm.getRMContext().getNodeLabelManager() + .removeFromClusterNodeLabels(request.getNodeLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); return response; @@ -806,19 +810,20 @@ public class AdminService extends CompositeService implements boolean isKnown = false; // both active and inactive nodes are recognized as known nodes if (requestedNode.getPort() != 0) { - if (rmContext.getRMNodes().containsKey(requestedNode) - || rmContext.getInactiveRMNodes().containsKey(requestedNode)) { + if (rm.getRMContext().getRMNodes().containsKey(requestedNode) || rm + .getRMContext().getInactiveRMNodes().containsKey(requestedNode)) { isKnown = true; } } else { - for (NodeId knownNode : rmContext.getRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getRMNodes().keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; } } if (!isKnown) { - for (NodeId knownNode : rmContext.getInactiveRMNodes().keySet()) { + for (NodeId knownNode : rm.getRMContext().getInactiveRMNodes() + .keySet()) { if (knownNode.getHost().equals(requestedNode.getHost())) { isKnown = true; break; @@ -842,7 +847,7 @@ public class AdminService extends CompositeService implements } } try { - rmContext.getNodeLabelManager().replaceLabelsOnNode( + rm.getRMContext().getNodeLabelManager().replaceLabelsOnNode( request.getNodeToLabels()); RMAuditLogger .logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -879,7 +884,7 @@ public class AdminService extends CompositeService implements checkRMStatus(user.getShortUserName(), operation, msg); - Set<NodeId> decommissioningNodes = rmContext.getNodesListManager() + Set<NodeId> decommissioningNodes = rm.getRMContext().getNodesListManager() .checkForDecommissioningNodes(); RMAuditLogger.logSuccess(user.getShortUserName(), operation, "AdminService"); @@ -915,6 +920,6 @@ public class AdminService extends CompositeService implements getConfiguration(new Configuration(false), YarnConfiguration.YARN_SITE_CONFIGURATION_FILE); - rmContext.getScheduler().setClusterMaxPriority(conf); + rm.getRMContext().getScheduler().setClusterMaxPriority(conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java index bcdf48b..d7485f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java @@ -45,14 +45,12 @@ public class CuratorBasedElectorService extends AbstractService LogFactory.getLog(CuratorBasedElectorService.class); private LeaderLatch leaderLatch; private CuratorFramework curator; - private RMContext rmContext; private String latchPath; private String rmId; private ResourceManager rm; - public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { + public CuratorBasedElectorService(ResourceManager rm) { super(CuratorBasedElectorService.class.getName()); - this.rmContext = rmContext; this.rm = rm; } @@ -102,7 +100,8 @@ public class CuratorBasedElectorService extends AbstractService public void isLeader() { LOG.info(rmId + "is elected leader, transitioning to active"); try { - rmContext.getRMAdminService().transitionToActive( + rm.getRMContext().getRMAdminService() + .transitionToActive( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { @@ -123,7 +122,8 @@ public class CuratorBasedElectorService extends AbstractService public void notLeader() { LOG.info(rmId + " relinquish leadership"); try { - rmContext.getRMAdminService().transitionToStandby( + rm.getRMContext().getRMAdminService() + .transitionToStandby( new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index 0e305a9..4844eba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor.RMAppLifetime import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -107,6 +108,7 @@ public class RMActiveServiceContext { private PlacementManager queuePlacementManager = null; private RMAppLifetimeMonitor rmAppLifetimeMonitor; + private QueueLimitCalculator queueLimitCalculator; public RMActiveServiceContext() { queuePlacementManager = new PlacementManager(); @@ -483,4 +485,17 @@ public class RMActiveServiceContext { public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { return this.rmAppLifetimeMonitor; } + + @Private + @Unstable + public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { + return this.queueLimitCalculator; + } + + @Private + @Unstable + public void setContainerQueueLimitCalculator( + QueueLimitCalculator limitCalculator) { + this.queueLimitCalculator = limitCalculator; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index fb160c4..ab3672e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -57,37 +57,39 @@ import org.apache.hadoop.yarn.util.Clock; import com.google.common.annotations.VisibleForTesting; +/** + * RMContextImpl class holds two services context. + * <ul> + * <li>serviceContext : These services called as <b>Always On</b> services. + * Services that need to run always irrespective of the HA state of the RM.</li> + * <li>activeServiceCotext : Active services context. Services that need to run + * only on the Active RM.</li> + * </ul> + * <p> + * <b>Note:</b> If any new service to be added to context, add it to a right + * context as per above description. + */ public class RMContextImpl implements RMContext { - private Dispatcher rmDispatcher; - - private boolean isHAEnabled; - - private HAServiceState haServiceState = - HAServiceProtocol.HAServiceState.INITIALIZING; - - private AdminService adminService; - - private ConfigurationProvider configurationProvider; + /** + * RM service contexts which runs through out RM life span. These are created + * once during start of RM. + */ + private RMServiceContext serviceContext; + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE->STANDBY. + */ private RMActiveServiceContext activeServiceContext; - private Configuration yarnConfiguration; - - private RMApplicationHistoryWriter rmApplicationHistoryWriter; - private SystemMetricsPublisher systemMetricsPublisher; - private EmbeddedElector elector; - - private QueueLimitCalculator queueLimitCalculator; - - private final Object haServiceStateLock = new Object(); - - private ResourceManager resourceManager; /** * Default constructor. To be used in conjunction with setter methods for * individual fields. */ public RMContextImpl() { + this.serviceContext = new RMServiceContext(); + this.activeServiceContext = new RMActiveServiceContext(); } @VisibleForTesting @@ -138,19 +140,143 @@ public class RMContextImpl implements RMContext { clientToAMTokenSecretManager, null); } + /** + * RM service contexts which runs through out JVM life span. These are created + * once during start of RM. + * @return serviceContext of RM + */ + @Private + @Unstable + public RMServiceContext getServiceContext() { + return serviceContext; + } + + /** + * <b>Note:</b> setting service context clears all services embedded with it. + * @param context rm service context + */ + @Private + @Unstable + public void setServiceContext(RMServiceContext context) { + this.serviceContext = context; + } + @Override - public Dispatcher getDispatcher() { - return this.rmDispatcher; + public ResourceManager getResourceManager() { + return serviceContext.getResourceManager(); + } + + public void setResourceManager(ResourceManager rm) { + serviceContext.setResourceManager(rm); + } + + @Override + public EmbeddedElector getLeaderElectorService() { + return serviceContext.getLeaderElectorService(); } @Override public void setLeaderElectorService(EmbeddedElector elector) { - this.elector = elector; + serviceContext.setLeaderElectorService(elector); } @Override - public EmbeddedElector getLeaderElectorService() { - return this.elector; + public Dispatcher getDispatcher() { + return serviceContext.getDispatcher(); + } + + void setDispatcher(Dispatcher dispatcher) { + serviceContext.setDispatcher(dispatcher); + } + + @Override + public AdminService getRMAdminService() { + return serviceContext.getRMAdminService(); + } + + void setRMAdminService(AdminService adminService) { + serviceContext.setRMAdminService(adminService); + } + + @Override + public boolean isHAEnabled() { + return serviceContext.isHAEnabled(); + } + + void setHAEnabled(boolean isHAEnabled) { + serviceContext.setHAEnabled(isHAEnabled); + } + + @Override + public HAServiceState getHAServiceState() { + return serviceContext.getHAServiceState(); + } + + void setHAServiceState(HAServiceState serviceState) { + serviceContext.setHAServiceState(serviceState); + } + + @Override + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return serviceContext.getRMApplicationHistoryWriter(); + } + + @Override + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter rmApplicationHistoryWriter) { + serviceContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + } + + @Override + public SystemMetricsPublisher getSystemMetricsPublisher() { + return serviceContext.getSystemMetricsPublisher(); + } + + @Override + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + serviceContext.setSystemMetricsPublisher(metricsPublisher); + } + + @Override + public ConfigurationProvider getConfigurationProvider() { + return serviceContext.getConfigurationProvider(); + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + serviceContext.setConfigurationProvider(configurationProvider); + } + + @Override + public Configuration getYarnConfiguration() { + return serviceContext.getYarnConfiguration(); + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + serviceContext.setYarnConfiguration(yarnConfiguration); + } + + public String getHAZookeeperConnectionState() { + return serviceContext.getHAZookeeperConnectionState(); + } + + // ========================================================================== + /** + * RM Active service context. This will be recreated for every transition from + * ACTIVE to STANDBY. + * @return activeServiceContext of active services + */ + @Private + @Unstable + public RMActiveServiceContext getActiveServiceContext() { + return activeServiceContext; + } + + @Private + @Unstable + void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { + this.activeServiceContext = activeServiceContext; } @Override @@ -228,11 +354,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getClientToAMTokenSecretManager(); } - @Override - public AdminService getRMAdminService() { - return this.adminService; - } - @VisibleForTesting public void setStateStore(RMStateStore store) { activeServiceContext.setStateStore(store); @@ -253,24 +374,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getResourceTrackerService(); } - void setHAEnabled(boolean isHAEnabled) { - this.isHAEnabled = isHAEnabled; - } - - void setHAServiceState(HAServiceState serviceState) { - synchronized (haServiceStateLock) { - this.haServiceState = serviceState; - } - } - - void setDispatcher(Dispatcher dispatcher) { - this.rmDispatcher = dispatcher; - } - - void setRMAdminService(AdminService adminService) { - this.adminService = adminService; - } - @Override public void setClientRMService(ClientRMService clientRMService) { activeServiceContext.setClientRMService(clientRMService); @@ -348,18 +451,6 @@ public class RMContextImpl implements RMContext { activeServiceContext.setResourceTrackerService(resourceTrackerService); } - @Override - public boolean isHAEnabled() { - return isHAEnabled; - } - - @Override - public HAServiceState getHAServiceState() { - synchronized (haServiceStateLock) { - return haServiceState; - } - } - public void setWorkPreservingRecoveryEnabled(boolean enabled) { activeServiceContext.setWorkPreservingRecoveryEnabled(enabled); } @@ -370,11 +461,6 @@ public class RMContextImpl implements RMContext { } @Override - public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { - return this.rmApplicationHistoryWriter; - } - - @Override public void setRMTimelineCollectorManager( RMTimelineCollectorManager timelineCollectorManager) { activeServiceContext.setRMTimelineCollectorManager( @@ -382,39 +468,6 @@ public class RMContextImpl implements RMContext { } @Override - public RMTimelineCollectorManager getRMTimelineCollectorManager() { - return activeServiceContext.getRMTimelineCollectorManager(); - } - - @Override - public void setSystemMetricsPublisher( - SystemMetricsPublisher metricsPublisher) { - this.systemMetricsPublisher = metricsPublisher; - } - - @Override - public SystemMetricsPublisher getSystemMetricsPublisher() { - return this.systemMetricsPublisher; - } - - @Override - public void setRMApplicationHistoryWriter( - RMApplicationHistoryWriter rmApplicationHistoryWriter) { - this.rmApplicationHistoryWriter = rmApplicationHistoryWriter; - - } - - @Override - public ConfigurationProvider getConfigurationProvider() { - return this.configurationProvider; - } - - public void setConfigurationProvider( - ConfigurationProvider configurationProvider) { - this.configurationProvider = configurationProvider; - } - - @Override public long getEpoch() { return activeServiceContext.getEpoch(); } @@ -463,27 +516,6 @@ public class RMContextImpl implements RMContext { return activeServiceContext.getSystemCredentialsForApps(); } - @Private - @Unstable - public RMActiveServiceContext getActiveServiceContext() { - return activeServiceContext; - } - - @Private - @Unstable - void setActiveServiceContext(RMActiveServiceContext activeServiceContext) { - this.activeServiceContext = activeServiceContext; - } - - @Override - public Configuration getYarnConfiguration() { - return this.yarnConfiguration; - } - - public void setYarnConfiguration(Configuration yarnConfiguration) { - this.yarnConfiguration=yarnConfiguration; - } - @Override public PlacementManager getQueuePlacementManager() { return this.activeServiceContext.getQueuePlacementManager(); @@ -496,12 +528,12 @@ public class RMContextImpl implements RMContext { @Override public QueueLimitCalculator getNodeManagerQueueLimitCalculator() { - return this.queueLimitCalculator; + return activeServiceContext.getNodeManagerQueueLimitCalculator(); } public void setContainerQueueLimitCalculator( QueueLimitCalculator limitCalculator) { - this.queueLimitCalculator = limitCalculator; + activeServiceContext.setContainerQueueLimitCalculator(limitCalculator); } @Override @@ -515,21 +547,5 @@ public class RMContextImpl implements RMContext { return this.activeServiceContext.getRMAppLifetimeMonitor(); } - public String getHAZookeeperConnectionState() { - if (elector == null) { - return "Could not find leader elector. Verify both HA and automatic " + - "failover are enabled."; - } else { - return elector.getZookeeperConnectionState(); - } - } - - @Override - public ResourceManager getResourceManager() { - return resourceManager; - } - - public void setResourceManager(ResourceManager rm) { - this.resourceManager = rm; - } + // Note: Read java doc before adding any services over here. } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java new file mode 100644 index 0000000..fe34d63 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServiceContext.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.yarn.conf.ConfigurationProvider; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; + +/** + * RMServiceContext class maintains "Always On" services. Services that need to + * run always irrespective of the HA state of the RM. This is created during + * initialization of RMContextImpl. + * <p> + * <b>Note:</b> If any services to be added in this class, make sure service + * will be running always irrespective of the HA state of the RM + */ +@Private +@Unstable +public class RMServiceContext { + + private Dispatcher rmDispatcher; + private boolean isHAEnabled; + private HAServiceState haServiceState = + HAServiceProtocol.HAServiceState.INITIALIZING; + private AdminService adminService; + private ConfigurationProvider configurationProvider; + private Configuration yarnConfiguration; + private RMApplicationHistoryWriter rmApplicationHistoryWriter; + private SystemMetricsPublisher systemMetricsPublisher; + private EmbeddedElector elector; + private final Object haServiceStateLock = new Object(); + private ResourceManager resourceManager; + + public ResourceManager getResourceManager() { + return resourceManager; + } + + public void setResourceManager(ResourceManager rm) { + this.resourceManager = rm; + } + + public ConfigurationProvider getConfigurationProvider() { + return this.configurationProvider; + } + + public void setConfigurationProvider( + ConfigurationProvider configurationProvider) { + this.configurationProvider = configurationProvider; + } + + public Dispatcher getDispatcher() { + return this.rmDispatcher; + } + + void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + public EmbeddedElector getLeaderElectorService() { + return this.elector; + } + + public void setLeaderElectorService(EmbeddedElector embeddedElector) { + this.elector = embeddedElector; + } + + public AdminService getRMAdminService() { + return this.adminService; + } + + void setRMAdminService(AdminService service) { + this.adminService = service; + } + + void setHAEnabled(boolean rmHAEnabled) { + this.isHAEnabled = rmHAEnabled; + } + + public boolean isHAEnabled() { + return isHAEnabled; + } + + public HAServiceState getHAServiceState() { + synchronized (haServiceStateLock) { + return haServiceState; + } + } + + void setHAServiceState(HAServiceState serviceState) { + synchronized (haServiceStateLock) { + this.haServiceState = serviceState; + } + } + + public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { + return this.rmApplicationHistoryWriter; + } + + public void setRMApplicationHistoryWriter( + RMApplicationHistoryWriter applicationHistoryWriter) { + this.rmApplicationHistoryWriter = applicationHistoryWriter; + } + + public void setSystemMetricsPublisher( + SystemMetricsPublisher metricsPublisher) { + this.systemMetricsPublisher = metricsPublisher; + } + + public SystemMetricsPublisher getSystemMetricsPublisher() { + return this.systemMetricsPublisher; + } + + public Configuration getYarnConfiguration() { + return this.yarnConfiguration; + } + + public void setYarnConfiguration(Configuration yarnConfiguration) { + this.yarnConfiguration = yarnConfiguration; + } + + public String getHAZookeeperConnectionState() { + if (elector == null) { + return "Could not find leader elector. Verify both HA and automatic " + + "failover are enabled."; + } else { + return elector.getZookeeperConnectionState(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 1adef33..d8de137 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -347,9 +347,9 @@ public class ResourceManager extends CompositeService implements Recoverable { YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); if (curatorEnabled) { this.curator = createAndStartCurator(conf); - elector = new CuratorBasedElectorService(rmContext, this); + elector = new CuratorBasedElectorService(this); } else { - elector = new ActiveStandbyElectorBasedElectorService(rmContext); + elector = new ActiveStandbyElectorBasedElectorService(this); } return elector; } @@ -562,7 +562,6 @@ public class ResourceManager extends CompositeService implements Recoverable { private ApplicationMasterLauncher applicationMasterLauncher; private ContainerAllocationExpirer containerAllocationExpirer; private ResourceManager rm; - private RMActiveServiceContext activeServiceContext; private boolean fromActive = false; private StandByTransitionRunnable standByTransitionRunnable; @@ -575,9 +574,6 @@ public class ResourceManager extends CompositeService implements Recoverable { protected void serviceInit(Configuration configuration) throws Exception { standByTransitionRunnable = new StandByTransitionRunnable(); - activeServiceContext = new RMActiveServiceContext(); - rmContext.setActiveServiceContext(activeServiceContext); - conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); rmSecretManagerService = createRMSecretManagerService(); addService(rmSecretManagerService); @@ -1135,7 +1131,7 @@ public class ResourceManager extends CompositeService implements Recoverable { ClusterMetrics.destroy(); QueueMetrics.clearQueueMetrics(); if (initialize) { - resetDispatcher(); + resetRMContext(); createAndInitActiveServices(true); } } @@ -1280,7 +1276,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected AdminService createAdminService() { - return new AdminService(this, rmContext); + return new AdminService(this); } protected RMSecretManagerService createRMSecretManagerService() { @@ -1403,16 +1399,24 @@ public class ResourceManager extends CompositeService implements Recoverable { return dispatcher; } - private void resetDispatcher() { + private void resetRMContext() { + RMContextImpl rmContextImpl = new RMContextImpl(); + // transfer service context to new RM service Context + rmContextImpl.setServiceContext(rmContext.getServiceContext()); + + // reset dispatcher Dispatcher dispatcher = setupDispatcher(); - ((Service)dispatcher).init(this.conf); - ((Service)dispatcher).start(); - removeService((Service)rmDispatcher); + ((Service) dispatcher).init(this.conf); + ((Service) dispatcher).start(); + removeService((Service) rmDispatcher); // Need to stop previous rmDispatcher before assigning new dispatcher // otherwise causes "AsyncDispatcher event handler" thread leak ((Service) rmDispatcher).stop(); rmDispatcher = dispatcher; addIfService(rmDispatcher); + rmContextImpl.setDispatcher(dispatcher); + + rmContext = rmContextImpl; rmContext.setDispatcher(rmDispatcher); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index aca2fc5..b0ad977 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -1042,7 +1042,7 @@ public class MockRM extends ResourceManager { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { @Override protected void startServer() { // override to not start rpc handler http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index c4fcc5d..47d18f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -123,13 +123,15 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { throws IOException, InterruptedException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); + ResourceManager rm = mock(ResourceManager.class); Configuration myConf = new Configuration(conf); myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); + when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); - ActiveStandbyElectorBasedElectorService - ees = new ActiveStandbyElectorBasedElectorService(rc); + ActiveStandbyElectorBasedElectorService ees = + new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); ees.enterNeutralMode(); @@ -291,7 +293,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { @Override protected EmbeddedElector createEmbeddedElector() { - return new ActiveStandbyElectorBasedElectorService(getRMContext()) { + return new ActiveStandbyElectorBasedElectorService(this) { @Override public void becomeActive() throws ServiceFailedException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c602f05b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 0efda9e..a558dd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -70,6 +70,7 @@ public class TestRMHA { private Log LOG = LogFactory.getLog(TestRMHA.class); private Configuration configuration; private MockRM rm = null; + private MockNM nm = null; private RMApp app = null; private RMAppAttempt attempt = null; private static final String STATE_ERR = @@ -134,7 +135,7 @@ public class TestRMHA { try { rm.getNewAppId(); - rm.registerNode("127.0.0.1:1", 2048); + nm = rm.registerNode("127.0.0.1:1", 2048); app = rm.submitApp(1024); attempt = app.getCurrentAppAttempt(); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.SCHEDULED); @@ -549,6 +550,17 @@ public class TestRMHA { verifyClusterMetrics(1, 1, 1, 1, 2048, 1); assertEquals(1, rm.getRMContext().getRMNodes().size()); assertEquals(1, rm.getRMContext().getRMApps().size()); + Assert.assertNotNull("Node not registered", nm); + + rm.adminService.transitionToStandby(requestInfo); + checkMonitorHealth(); + checkStandbyRMFunctionality(); + // race condition causes to register/node heartbeat node even after service + // is stopping/stopped. New RMContext is being created on every transition + // to standby, so metrics should be 0 which indicates new context reference + // has taken. + nm.registerNode(); + verifyClusterMetrics(0, 0, 0, 0, 0, 0); // 3. Create new RM rm = new MockRM(conf, memStore) { @@ -590,7 +602,7 @@ public class TestRMHA { rm = new MockRM(configuration) { @Override protected AdminService createAdminService() { - return new AdminService(this, getRMContext()) { + return new AdminService(this) { int counter = 0; @Override protected void setConfig(Configuration conf) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org