Author: kasha
Date: Wed Dec 11 15:15:35 2013
New Revision: 1550168
URL: http://svn.apache.org/r1550168
Log:
YARN-1481. Move internal services logic from AdminService to ResourceManager.
(vinodkv via kasha)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Dec 11
15:15:35 2013
@@ -136,6 +136,9 @@ Release 2.4.0 - UNRELEASED
YARN-1378. Implemented a cleaner of old finished applications from the RM
state-store. (Jian He via vinodkv)
+ YARN-1481. Move internal services logic from AdminService to
ResourceManager.
+ (vinodkv via kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
Wed Dec 11 15:15:35 2013
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +41,6 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api
import
org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import
org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import com.google.protobuf.BlockingService;
+
public class AdminService extends AbstractService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
@@ -73,10 +72,6 @@ public class AdminService extends Abstra
private final RMContext rmContext;
private final ResourceManager rm;
- @VisibleForTesting
- protected HAServiceProtocol.HAServiceState
- haState = HAServiceProtocol.HAServiceState.INITIALIZING;
- boolean haEnabled;
private Server server;
private InetSocketAddress masterServiceAddress;
@@ -93,13 +88,6 @@ public class AdminService extends Abstra
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
- haEnabled = HAUtil.isHAEnabled(conf);
- if (haEnabled) {
- HAUtil.verifyAndSetConfiguration(conf);
- rm.setConf(conf);
- }
- rm.createAndInitActiveServices();
-
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -112,11 +100,6 @@ public class AdminService extends Abstra
@Override
protected synchronized void serviceStart() throws Exception {
- if (haEnabled) {
- transitionToStandby(true);
- } else {
- transitionToActive();
- }
startServer();
super.serviceStart();
}
@@ -124,8 +107,6 @@ public class AdminService extends Abstra
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
- transitionToStandby(false);
- haState = HAServiceState.STOPPING;
super.serviceStop();
}
@@ -145,7 +126,7 @@ public class AdminService extends Abstra
refreshServiceAcls(conf, new RMPolicyProvider());
}
- if (haEnabled) {
+ if (rmContext.isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@@ -182,39 +163,27 @@ public class AdminService extends Abstra
}
private synchronized boolean isRMActive() {
- return HAServiceState.ACTIVE == haState;
+ return HAServiceState.ACTIVE == rmContext.getHAServiceState();
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE &&
!rm.areActiveServicesRunning()) {
+ if (isRMActive() && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
- synchronized void transitionToActive() throws Exception {
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
- LOG.info("Already in active state");
- return;
- }
-
- LOG.info("Transitioning to active");
- rm.startActiveServices();
- haState = HAServiceProtocol.HAServiceState.ACTIVE;
- LOG.info("Transitioned to active");
- }
-
@Override
- public synchronized void
transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
- throws IOException {
+ public synchronized void transitionToActive(
+ HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
- transitionToActive();
+ rm.transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
@@ -226,32 +195,14 @@ public class AdminService extends Abstra
}
}
- synchronized void transitionToStandby(boolean initialize)
- throws Exception {
- if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
- LOG.info("Already in standby state");
- return;
- }
-
- LOG.info("Transitioning to standby");
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
- rm.stopActiveServices();
- if (initialize) {
- rm.createAndInitActiveServices();
- }
- }
- haState = HAServiceProtocol.HAServiceState.STANDBY;
- LOG.info("Transitioned to standby");
- }
-
@Override
- public synchronized void
transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
- throws IOException {
+ public synchronized void transitionToStandby(
+ HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
- transitionToStandby(true);
+ rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
@@ -266,15 +217,15 @@ public class AdminService extends Abstra
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
+ HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
- HAServiceProtocol.HAServiceState.STANDBY) {
+ if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
- }
+ }
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
Wed Dec 11 15:15:35 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.res
public interface RMContext {
Dispatcher getDispatcher();
-
+
+ boolean isHAEnabled();
+
+ HAServiceState getHAServiceState();
+
RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
Wed Dec 11 15:15:35 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.re
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +56,10 @@ public class RMContextImpl implements RM
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
+ private boolean isHAEnabled;
+ private HAServiceState haServiceState =
+ HAServiceProtocol.HAServiceState.INITIALIZING;
+
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
@@ -211,6 +217,16 @@ public class RMContextImpl implements RM
return resourceTrackerService;
}
+ void setHAEnabled(boolean isHAEnabled) {
+ this.isHAEnabled = isHAEnabled;
+ }
+
+ void setHAServiceState(HAServiceState haServiceState) {
+ synchronized (haServiceState) {
+ this.haServiceState = haServiceState;
+ }
+ }
+
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
@@ -290,4 +306,16 @@ public class RMContextImpl implements RM
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
+
+ @Override
+ public boolean isHAEnabled() {
+ return isHAEnabled;
+ }
+
+ @Override
+ public HAServiceState getHAServiceState() {
+ synchronized (haServiceState) {
+ return haServiceState;
+ }
+ }
}
\ No newline at end of file
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1550168&r1=1550167&r2=1550168&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Wed Dec 11 15:15:35 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaug
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -188,6 +191,12 @@ public class ResourceManager extends Com
addService(adminService);
rmContext.setRMAdminService(adminService);
+ this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
+ if (this.rmContext.isHAEnabled()) {
+ HAUtil.verifyAndSetConfiguration(conf);
+ }
+ createAndInitActiveServices();
+
super.serviceInit(conf);
}
@@ -217,9 +226,8 @@ public class ResourceManager extends Com
}
protected RMStateStoreOperationFailedEventDispatcher
- createRMStateStoreOperationFailedEventDispatcher() {
- return new RMStateStoreOperationFailedEventDispatcher(
- rmContext.getRMAdminService());
+ createRMStateStoreOperationFailedEventDispatcher() {
+ return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
}
protected Dispatcher createDispatcher() {
@@ -655,11 +663,14 @@ public class ResourceManager extends Com
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
- private final AdminService adminService;
- public RMStateStoreOperationFailedEventDispatcher(
- AdminService adminService) {
- this.adminService = adminService;
+ private final RMContext rmContext;
+ private final ResourceManager rm;
+
+ public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
+ ResourceManager resourceManager) {
+ this.rmContext = rmContext;
+ this.rm = resourceManager;
}
@Override
@@ -671,16 +682,14 @@ public class ResourceManager extends Com
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
- synchronized(adminService) {
- if (adminService.haEnabled) {
- try {
- // Transition to standby and reinit active services
- LOG.info("Transitioning RM to Standby mode");
- adminService.transitionToStandby(true);
- return;
- } catch (Exception e) {
- LOG.error("Failed to transition RM to Standby mode.");
- }
+ if (rmContext.isHAEnabled()) {
+ try {
+ // Transition to standby and reinit active services
+ LOG.info("Transitioning RM to Standby mode");
+ rm.transitionToStandby(true);
+ return;
+ } catch (Exception e) {
+ LOG.error("Failed to transition RM to Standby mode.");
}
}
}
@@ -826,10 +835,6 @@ public class ResourceManager extends Com
webApp = builder.start(new RMWebApp(this));
}
- void setConf(Configuration configuration) {
- conf = configuration;
- }
-
/**
* Helper method to create and init {@link #activeServices}. This creates an
* instance of {@link RMActiveServices} and initializes it.
@@ -870,6 +875,39 @@ public class ResourceManager extends Com
return activeServices != null && activeServices.isInState(STATE.STARTED);
}
+ synchronized void transitionToActive() throws Exception {
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.ACTIVE) {
+ LOG.info("Already in active state");
+ return;
+ }
+
+ LOG.info("Transitioning to active state");
+ startActiveServices();
+ rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
+ LOG.info("Transitioned to active state");
+ }
+
+ synchronized void transitionToStandby(boolean initialize)
+ throws Exception {
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.STANDBY) {
+ LOG.info("Already in standby state");
+ return;
+ }
+
+ LOG.info("Transitioning to standby state");
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.ACTIVE) {
+ stopActiveServices();
+ if (initialize) {
+ createAndInitActiveServices();
+ }
+ }
+ rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
+ LOG.info("Transitioned to standby state");
+ }
+
@Override
protected void serviceStart() throws Exception {
try {
@@ -877,6 +915,13 @@ public class ResourceManager extends Com
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
+
+ if (this.rmContext.isHAEnabled()) {
+ transitionToStandby(true);
+ } else {
+ transitionToActive();
+ }
+
super.serviceStart();
}
@@ -888,6 +933,8 @@ public class ResourceManager extends Com
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
+ transitionToStandby(false);
+ rmContext.setHAServiceState(HAServiceState.STOPPING);
}
protected ResourceTrackerService createResourceTrackerService() {