Author: kasha
Date: Fri Dec 6 00:10:03 2013
New Revision: 1548333
URL: http://svn.apache.org/r1548333
Log:
YARN-1181. Augment MiniYARNCluster to support HA mode (kasha)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
- copied unchanged from r1548330,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1548333&r1=1548332&r2=1548333&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Dec 6
00:10:03 2013
@@ -123,6 +123,8 @@ Release 2.4.0 - UNRELEASED
YARN-1403. Separate out configuration loading from QueueManager in the Fair
Scheduler (Sandy Ryza)
+ YARN-1181. Augment MiniYARNCluster to support HA mode (Karthik Kambatla)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1548333&r1=1548332&r2=1548333&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Fri Dec 6 00:10:03 2013
@@ -25,18 +25,21 @@ import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -87,7 +90,7 @@ public class MiniYARNCluster extends Com
}
private NodeManager[] nodeManagers;
- private ResourceManager resourceManager;
+ private ResourceManager[] resourceManagers;
private ResourceManagerWrapper resourceManagerWrapper;
@@ -103,12 +106,14 @@ public class MiniYARNCluster extends Com
/**
* @param testName name of the test
- * @param noOfNodeManagers the number of node managers in the cluster
+ * @param numResourceManagers the number of resource managers in the cluster
+ * @param numNodeManagers the number of node managers in the cluster
* @param numLocalDirs the number of nm-local-dirs per nodemanager
* @param numLogDirs the number of nm-log-dirs per nodemanager
*/
- public MiniYARNCluster(String testName, int noOfNodeManagers,
- int numLocalDirs, int numLogDirs) {
+ public MiniYARNCluster(
+ String testName, int numResourceManagers, int numNodeManagers,
+ int numLocalDirs, int numLogDirs) {
super(testName.replace("$", ""));
this.numLocalDirs = numLocalDirs;
this.numLogDirs = numLogDirs;
@@ -157,28 +162,103 @@ public class MiniYARNCluster extends Com
this.testWorkDir = targetWorkDir;
}
- resourceManagerWrapper = new ResourceManagerWrapper();
- addService(resourceManagerWrapper);
- nodeManagers = new CustomNodeManager[noOfNodeManagers];
- for(int index = 0; index < noOfNodeManagers; index++) {
+ resourceManagers = new ResourceManager[numResourceManagers];
+ for (int i = 0; i < numResourceManagers; i++) {
+ resourceManagers[i] = new ResourceManager();
+ addService(new ResourceManagerWrapper(i));
+ }
+ nodeManagers = new CustomNodeManager[numNodeManagers];
+ for(int index = 0; index < numNodeManagers; index++) {
addService(new NodeManagerWrapper(index));
nodeManagers[index] = new CustomNodeManager();
}
}
-
- @Override
+
+ /**
+ * @param testName name of the test
+ * @param numNodeManagers the number of node managers in the cluster
+ * @param numLocalDirs the number of nm-local-dirs per nodemanager
+ * @param numLogDirs the number of nm-log-dirs per nodemanager
+ */
+ public MiniYARNCluster(String testName, int numNodeManagers,
+ int numLocalDirs, int numLogDirs) {
+ this(testName, 1, numNodeManagers, numLocalDirs, numLogDirs);
+ }
+
+ @Override
public void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf instanceof YarnConfiguration ? conf
- : new
YarnConfiguration(
- conf));
+ if (resourceManagers.length > 1) {
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+
+ StringBuilder rmIds = new StringBuilder();
+ for (int i = 0; i < resourceManagers.length; i++) {
+ if (i != 0) {
+ rmIds.append(",");
+ }
+ rmIds.append("rm" + i);
+ }
+ conf.set(YarnConfiguration.RM_HA_IDS, rmIds.toString());
+ }
+ super.serviceInit(
+ conf instanceof YarnConfiguration ? conf : new
YarnConfiguration(conf));
}
public File getTestWorkDir() {
return testWorkDir;
}
+ /**
+ * In a HA cluster, go through all the RMs and find the Active RM. If none
+ * of them are active, wait upto 5 seconds for them to transition to Active.
+ *
+ * In an non-HA cluster, return the index of the only RM.
+ *
+ * @return index of the active RM
+ */
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ int getActiveRMIndex() {
+ if (resourceManagers.length == 1) {
+ return 0;
+ }
+
+ int numRetriesForRMBecomingActive = 5;
+ while (numRetriesForRMBecomingActive-- > 0) {
+ for (int i = 0; i < resourceManagers.length; i++) {
+ try {
+ if (HAServiceProtocol.HAServiceState.ACTIVE ==
+ resourceManagers[i].getRMContext().getRMAdminService()
+ .getServiceStatus().getState()) {
+ return i;
+ }
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Couldn't read the status of " +
+ "a ResourceManger in the HA ensemble.", e);
+ }
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new YarnRuntimeException("Interrupted while waiting for one " +
+ "of the ResourceManagers to become active");
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * @return the active {@link ResourceManager} of the cluster,
+ * null if none of them are active.
+ */
public ResourceManager getResourceManager() {
- return this.resourceManager;
+ int activeRMIndex = getActiveRMIndex();
+ return activeRMIndex == -1
+ ? null
+ : this.resourceManagers[getActiveRMIndex()];
+ }
+
+ public ResourceManager getResourceManager(int i) {
+ return this.resourceManagers[i];
}
public NodeManager getNodeManager(int i) {
@@ -195,8 +275,29 @@ public class MiniYARNCluster extends Com
}
private class ResourceManagerWrapper extends AbstractService {
- public ResourceManagerWrapper() {
- super(ResourceManagerWrapper.class.getName());
+ private int index;
+
+ public ResourceManagerWrapper(int i) {
+ super(ResourceManagerWrapper.class.getName() + "_" + i);
+ index = i;
+ }
+
+ private void setNonHARMConfiguration(Configuration conf) {
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
+ conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname + ":0");
+ WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+ }
+
+ private void setHARMConfiguration(Configuration conf) {
+ String rmId = "rm" + index;
+ String hostname = MiniYARNCluster.getHostname();
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ for (String confKey : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
+ conf.set(HAUtil.addSuffix(confKey, rmId), hostname + ":0");
+ }
}
@Override
@@ -206,22 +307,15 @@ public class MiniYARNCluster extends Com
if (!conf.getBoolean(
YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
- // pick free random ports.
- String hostname = MiniYARNCluster.getHostname();
- conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_ADMIN_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, hostname + ":0");
- conf.set(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, hostname +
":0");
- WebAppUtils.setRMWebAppHostnameAndPort(conf, hostname, 0);
+ if (HAUtil.isHAEnabled(conf)) {
+ setHARMConfiguration(conf);
+ } else {
+ setNonHARMConfiguration(conf);
+ }
}
- resourceManager = new ResourceManager() {
- @Override
- protected void doSecureLogin() throws IOException {
- // Don't try to login using keytab in the testcase.
- };
- };
- resourceManager.init(conf);
-
resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
+ resourceManagers[index].init(conf);
+ resourceManagers[index].getRMContext().getDispatcher().register
+ (RMAppAttemptEventType.class,
new EventHandler<RMAppAttemptEvent>() {
public void handle(RMAppAttemptEvent event) {
if (event instanceof RMAppAttemptRegistrationEvent) {
@@ -239,20 +333,20 @@ public class MiniYARNCluster extends Com
try {
new Thread() {
public void run() {
- resourceManager.start();
- };
+ resourceManagers[index].start();
+ }
}.start();
int waitCount = 0;
- while (resourceManager.getServiceState() == STATE.INITED
+ while (resourceManagers[index].getServiceState() == STATE.INITED
&& waitCount++ < 60) {
LOG.info("Waiting for RM to start...");
Thread.sleep(1500);
}
- if (resourceManager.getServiceState() != STATE.STARTED) {
+ if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
"ResourceManager failed to start. Final state is "
- + resourceManager.getServiceState());
+ + resourceManagers[index].getServiceState());
}
super.serviceStart();
} catch (Throwable t) {
@@ -278,9 +372,9 @@ public class MiniYARNCluster extends Com
@Override
protected synchronized void serviceStop() throws Exception {
- if (resourceManager != null) {
+ if (resourceManagers[index] != null) {
waitForAppMastersToFinish(5000);
- resourceManager.stop();
+ resourceManagers[index].stop();
}
super.serviceStop();
@@ -372,7 +466,7 @@ public class MiniYARNCluster extends Com
new Thread() {
public void run() {
nodeManagers[index].start();
- };
+ }
}.start();
int waitCount = 0;
while (nodeManagers[index].getServiceState() == STATE.INITED
@@ -398,12 +492,12 @@ public class MiniYARNCluster extends Com
super.serviceStop();
}
}
-
+
private class CustomNodeManager extends NodeManager {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcase.
- };
+ }
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@@ -412,8 +506,8 @@ public class MiniYARNCluster extends Com
healthChecker, metrics) {
@Override
protected ResourceTracker getRMClient() {
- final ResourceTrackerService rt = resourceManager
- .getResourceTrackerService();
+ final ResourceTrackerService rt =
+ getResourceManager().getResourceTrackerService();
final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -424,8 +518,7 @@ public class MiniYARNCluster extends Com
public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnException,
IOException {
- NodeHeartbeatResponse response = recordFactory.newRecordInstance(
- NodeHeartbeatResponse.class);
+ NodeHeartbeatResponse response;
try {
response = rt.nodeHeartbeat(request);
} catch (YarnException e) {
@@ -440,8 +533,7 @@ public class MiniYARNCluster extends Com
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request)
throws YarnException, IOException {
- RegisterNodeManagerResponse response = recordFactory.
- newRecordInstance(RegisterNodeManagerResponse.class);
+ RegisterNodeManagerResponse response;
try {
response = rt.registerNodeManager(request);
} catch (YarnException e) {
@@ -452,13 +544,11 @@ public class MiniYARNCluster extends Com
return response;
}
};
- };
+ }
@Override
- protected void stopRMProxy() {
- return;
- }
+ protected void stopRMProxy() { }
};
- };
+ }
}
}