Author: bikas
Date: Tue Sep 10 17:43:51 2013
New Revision: 1521560
URL: http://svn.apache.org/r1521560
Log:
YARN-1098. Separate out RM services into Always On and Active (Karthik Kambatla
via bikas)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1521560&r1=1521559&r2=1521560&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Sep 10 17:43:51 2013
@@ -27,6 +27,8 @@ Release 2.3.0 - UNRELEASED
IMPROVEMENTS
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
+ YARN-1098. Separate out RM services into Always On and Active (Karthik
+ Kambatla via bikas)
OPTIMIZATIONS
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1521560&r1=1521559&r2=1521560&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Tue Sep 10 17:43:51 2013
@@ -107,9 +107,18 @@ public class ResourceManager extends Com
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
public static final long clusterTimeStamp = System.currentTimeMillis();
+ /**
+ * "Active" services. Services that need to run only on the Active RM.
+ * These services are managed (initialized, started, stopped) by the
+ * {@link CompositeService} RMActiveServices.
+ *
+ * RM is active when (1) HA is disabled, or (2) HA is enabled and the RM is
+ * in Active state.
+ */
+ protected RMActiveServices activeServices;
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
new ClientToAMTokenSecretManagerInRM();
-
+
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
@@ -135,6 +144,8 @@ public class ResourceManager extends Com
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
+ /** End of Active services */
+
private Configuration conf;
public ResourceManager() {
@@ -147,137 +158,11 @@ public class ResourceManager extends Com
@Override
protected void serviceInit(Configuration conf) throws Exception {
-
validateConfigs(conf);
-
this.conf = conf;
- this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
- this.rmDispatcher = createDispatcher();
- addIfService(this.rmDispatcher);
-
- this.amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
-
- this.containerAllocationExpirer = new ContainerAllocationExpirer(
- this.rmDispatcher);
- addService(this.containerAllocationExpirer);
-
- AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
- addService(amLivelinessMonitor);
-
- AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
- addService(amFinishingMonitor);
-
- this.containerTokenSecretManager = createContainerTokenSecretManager(conf);
- this.nmTokenSecretManager = createNMTokenSecretManager(conf);
-
- boolean isRecoveryEnabled = conf.getBoolean(
- YarnConfiguration.RECOVERY_ENABLED,
- YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
-
- RMStateStore rmStore = null;
- if(isRecoveryEnabled) {
- recoveryEnabled = true;
- rmStore = RMStateStoreFactory.getStore(conf);
- } else {
- recoveryEnabled = false;
- rmStore = new NullRMStateStore();
- }
-
- try {
- rmStore.init(conf);
- rmStore.setRMDispatcher(rmDispatcher);
- } catch (Exception e) {
- // the Exception from stateStore.init() needs to be handled for
- // HA and we need to give up master status if we got fenced
- LOG.error("Failed to init state store", e);
- ExitUtil.terminate(1, e);
- }
-
- if (UserGroupInformation.isSecurityEnabled()) {
- this.delegationTokenRenewer = createDelegationTokenRenewer();
- }
-
- this.rmContext =
- new RMContextImpl(this.rmDispatcher, rmStore,
- this.containerAllocationExpirer, amLivelinessMonitor,
- amFinishingMonitor, delegationTokenRenewer,
this.amRmTokenSecretManager,
- this.containerTokenSecretManager, this.nmTokenSecretManager,
- this.clientToAMSecretManager);
-
- // Register event handler for NodesListManager
- this.nodesListManager = new NodesListManager(this.rmContext);
- this.rmDispatcher.register(NodesListManagerEventType.class,
- this.nodesListManager);
- addService(nodesListManager);
-
- // Initialize the scheduler
- this.scheduler = createScheduler();
- this.schedulerDispatcher = createSchedulerEventDispatcher();
- addIfService(this.schedulerDispatcher);
- this.rmDispatcher.register(SchedulerEventType.class,
- this.schedulerDispatcher);
-
- // Register event handler for RmAppEvents
- this.rmDispatcher.register(RMAppEventType.class,
- new ApplicationEventDispatcher(this.rmContext));
-
- // Register event handler for RmAppAttemptEvents
- this.rmDispatcher.register(RMAppAttemptEventType.class,
- new ApplicationAttemptEventDispatcher(this.rmContext));
-
- // Register event handler for RmNodes
- this.rmDispatcher.register(RMNodeEventType.class,
- new NodeEventDispatcher(this.rmContext));
-
- this.nmLivelinessMonitor = createNMLivelinessMonitor();
- addService(this.nmLivelinessMonitor);
-
- this.resourceTracker = createResourceTrackerService();
- addService(resourceTracker);
-
- DefaultMetricsSystem.initialize("ResourceManager");
- JvmMetrics.initSingleton("ResourceManager", null);
-
- try {
- this.scheduler.reinitialize(conf, this.rmContext);
- } catch (IOException ioe) {
- throw new RuntimeException("Failed to initialize scheduler", ioe);
- }
-
- // creating monitors that handle preemption
- createPolicyMonitors();
-
- masterService = createApplicationMasterService();
- addService(masterService) ;
-
- this.applicationACLsManager = new ApplicationACLsManager(conf);
-
- this.rmAppManager = createRMAppManager();
- // Register event handler for RMAppManagerEvents
- this.rmDispatcher.register(RMAppManagerEventType.class,
- this.rmAppManager);
- this.rmDTSecretManager =
createRMDelegationTokenSecretManager(this.rmContext);
- rmContext.setRMDelegationTokenSecretManager(this.rmDTSecretManager);
- clientRM = createClientRMService();
- rmContext.setClientRMService(clientRM);
- addService(clientRM);
-
- adminService = createAdminService(clientRM, masterService,
resourceTracker);
- addService(adminService);
-
- this.applicationMasterLauncher = createAMLauncher();
- this.rmDispatcher.register(AMLauncherEventType.class,
- this.applicationMasterLauncher);
-
- addService(applicationMasterLauncher);
- if (UserGroupInformation.isSecurityEnabled()) {
- addService(delegationTokenRenewer);
- delegationTokenRenewer.setRMContext(rmContext);
- }
- new RMNMInfo(this.rmContext, this.scheduler);
-
+ activeServices = new RMActiveServices();
+ addService(activeServices);
super.serviceInit(conf);
}
@@ -378,6 +263,217 @@ public class ResourceManager extends Com
}
}
+ /**
+ * RMActiveServices handles all the Active services in the RM.
+ */
+ @Private
+ class RMActiveServices extends CompositeService {
+ RMActiveServices() {
+ super("RMActiveServices");
+ }
+
+ @Override
+ protected void serviceInit(Configuration configuration) throws Exception {
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ rmDispatcher = createDispatcher();
+ addIfService(rmDispatcher);
+
+ amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
+
+ containerAllocationExpirer = new
ContainerAllocationExpirer(rmDispatcher);
+ addService(containerAllocationExpirer);
+
+ AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
+ addService(amLivelinessMonitor);
+
+ AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
+ addService(amFinishingMonitor);
+
+ containerTokenSecretManager = createContainerTokenSecretManager(conf);
+ nmTokenSecretManager = createNMTokenSecretManager(conf);
+
+ boolean isRecoveryEnabled = conf.getBoolean(
+ YarnConfiguration.RECOVERY_ENABLED,
+ YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED);
+
+ RMStateStore rmStore = null;
+ if(isRecoveryEnabled) {
+ recoveryEnabled = true;
+ rmStore = RMStateStoreFactory.getStore(conf);
+ } else {
+ recoveryEnabled = false;
+ rmStore = new NullRMStateStore();
+ }
+
+ try {
+ rmStore.init(conf);
+ rmStore.setRMDispatcher(rmDispatcher);
+ } catch (Exception e) {
+ // the Exception from stateStore.init() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to init state store", e);
+ ExitUtil.terminate(1, e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ delegationTokenRenewer = createDelegationTokenRenewer();
+ }
+
+ rmContext = new RMContextImpl(
+ rmDispatcher, rmStore, containerAllocationExpirer,
amLivelinessMonitor,
+ amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
+ containerTokenSecretManager, nmTokenSecretManager,
+ clientToAMSecretManager);
+
+ // Register event handler for NodesListManager
+ nodesListManager = new NodesListManager(rmContext);
+ rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
+ addService(nodesListManager);
+
+ // Initialize the scheduler
+ scheduler = createScheduler();
+ schedulerDispatcher = createSchedulerEventDispatcher();
+ addIfService(schedulerDispatcher);
+ rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
+
+ // Register event handler for RmAppEvents
+ rmDispatcher.register(RMAppEventType.class,
+ new ApplicationEventDispatcher(rmContext));
+
+ // Register event handler for RmAppAttemptEvents
+ rmDispatcher.register(RMAppAttemptEventType.class,
+ new ApplicationAttemptEventDispatcher(rmContext));
+
+ // Register event handler for RmNodes
+ rmDispatcher.register(
+ RMNodeEventType.class, new NodeEventDispatcher(rmContext));
+
+ nmLivelinessMonitor = createNMLivelinessMonitor();
+ addService(nmLivelinessMonitor);
+
+ resourceTracker = createResourceTrackerService();
+ addService(resourceTracker);
+
+ DefaultMetricsSystem.initialize("ResourceManager");
+ JvmMetrics.initSingleton("ResourceManager", null);
+
+ try {
+ scheduler.reinitialize(conf, rmContext);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to initialize scheduler", ioe);
+ }
+
+ // creating monitors that handle preemption
+ createPolicyMonitors();
+
+ masterService = createApplicationMasterService();
+ addService(masterService) ;
+
+ applicationACLsManager = new ApplicationACLsManager(conf);
+
+ rmAppManager = createRMAppManager();
+ // Register event handler for RMAppManagerEvents
+ rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
+ rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
+ rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
+ clientRM = createClientRMService();
+ rmContext.setClientRMService(clientRM);
+ addService(clientRM);
+
+ adminService = createAdminService(clientRM, masterService,
resourceTracker);
+ addService(adminService);
+
+ applicationMasterLauncher = createAMLauncher();
+ rmDispatcher.register(AMLauncherEventType.class,
+ applicationMasterLauncher);
+
+ addService(applicationMasterLauncher);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ addService(delegationTokenRenewer);
+ delegationTokenRenewer.setRMContext(rmContext);
+ }
+
+ new RMNMInfo(rmContext, scheduler);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ amRmTokenSecretManager.start();
+ containerTokenSecretManager.start();
+ nmTokenSecretManager.start();
+
+ RMStateStore rmStore = rmContext.getStateStore();
+ // The state store needs to start irrespective of recoveryEnabled as apps
+ // need events to move to further states.
+ rmStore.start();
+
+ if(recoveryEnabled) {
+ try {
+ RMState state = rmStore.loadState();
+ recover(state);
+ } catch (Exception e) {
+ // the Exception from loadState() needs to be handled for
+ // HA and we need to give up master status if we got fenced
+ LOG.error("Failed to load/recover state", e);
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ startWepApp();
+ try {
+ rmDTSecretManager.startThreads();
+ } catch(IOException ie) {
+ throw new YarnRuntimeException("Failed to start secret manager
threads", ie);
+ }
+
+ if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER,
false)) {
+ String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
+ hostname = (hostname.contains(":")) ? hostname.substring(0,
hostname.indexOf(":")) : hostname;
+ int port = webApp.port();
+ String resolvedAddress = hostname + ":" + port;
+ conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
+ }
+
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (webApp != null) {
+ webApp.stop();
+ }
+ if (rmDTSecretManager != null) {
+ rmDTSecretManager.stopThreads();
+ }
+
+ if (amRmTokenSecretManager != null) {
+ amRmTokenSecretManager.stop();
+ }
+ if (containerTokenSecretManager != null) {
+ containerTokenSecretManager.stop();
+ }
+ if(nmTokenSecretManager != null) {
+ nmTokenSecretManager.stop();
+ }
+
+ DefaultMetricsSystem.shutdown();
+
+ if (rmContext != null) {
+ RMStateStore store = rmContext.getStateStore();
+ try {
+ store.close();
+ } catch (Exception e) {
+ LOG.error("Error closing store.", e);
+ }
+ }
+ super.serviceStop();
+ }
+ }
+
@Private
public static class SchedulerEventDispatcher extends AbstractService
implements EventHandler<SchedulerEvent> {
@@ -620,54 +716,7 @@ public class ResourceManager extends Com
throw new YarnRuntimeException("Failed to login", ie);
}
- this.amRmTokenSecretManager.start();
- this.containerTokenSecretManager.start();
- this.nmTokenSecretManager.start();
-
- RMStateStore rmStore = rmContext.getStateStore();
- // The state store needs to start irrespective of recoveryEnabled as apps
- // need events to move to further states.
- rmStore.start();
-
- if(recoveryEnabled) {
- try {
- RMState state = rmStore.loadState();
- recover(state);
- } catch (Exception e) {
- // the Exception from loadState() needs to be handled for
- // HA and we need to give up master status if we got fenced
- LOG.error("Failed to load/recover state", e);
- ExitUtil.terminate(1, e);
- }
- }
-
- startWepApp();
- try {
- rmDTSecretManager.startThreads();
- } catch(IOException ie) {
- throw new YarnRuntimeException("Failed to start secret manager threads",
ie);
- }
-
- if (getConfig().getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false))
{
- String hostname = getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS,
-
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS);
- hostname = (hostname.contains(":")) ? hostname.substring(0,
hostname.indexOf(":")) : hostname;
- int port = webApp.port();
- String resolvedAddress = hostname + ":" + port;
- conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, resolvedAddress);
- }
-
super.serviceStart();
-
- /*synchronized(shutdown) {
- try {
- while(!shutdown.get()) {
- shutdown.wait();
- }
- } catch(InterruptedException ie) {
- LOG.info("Interrupted while waiting", ie);
- }
- }*/
}
protected void doSecureLogin() throws IOException {
@@ -677,39 +726,6 @@ public class ResourceManager extends Com
@Override
protected void serviceStop() throws Exception {
- if (webApp != null) {
- webApp.stop();
- }
- if (rmDTSecretManager != null) {
- rmDTSecretManager.stopThreads();
- }
-
- if (amRmTokenSecretManager != null) {
- this.amRmTokenSecretManager.stop();
- }
- if (containerTokenSecretManager != null) {
- this.containerTokenSecretManager.stop();
- }
- if(nmTokenSecretManager != null) {
- nmTokenSecretManager.stop();
- }
-
- /*synchronized(shutdown) {
- shutdown.set(true);
- shutdown.notifyAll();
- }*/
-
- DefaultMetricsSystem.shutdown();
-
- if (rmContext != null) {
- RMStateStore store = rmContext.getStateStore();
- try {
- store.close();
- } catch (Exception e) {
- LOG.error("Error closing store.", e);
- }
- }
-
super.serviceStop();
}