Author: arp
Date: Wed Oct 23 02:34:44 2013
New Revision: 1534894
URL: http://svn.apache.org/r1534894
Log:
Merging r1534707 through r1534893 from trunk to branch HDFS-2832
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Wed Oct 23
02:34:44 2013
@@ -82,6 +82,12 @@ Release 2.3.0 - UNRELEASED
YARN-1300. SLS tests fail because conf puts YARN properties in
fair-scheduler.xml (Ted Yu via Sandy Ryza)
+ YARN-1183. MiniYARNCluster shutdown takes several minutes intermittently
+ (Andrey Klochkov via jeagles)
+
+ YARN-1305. RMHAProtocolService#serviceInit should handle HAUtil's
+ IllegalArgumentException (Tsuyoshi Ozawa via bikas)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -130,6 +136,9 @@ Release 2.2.1 - UNRELEASED
YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin
or
logs. (cnauroth)
+ YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take
effect
+ (Sandy Ryza)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
Wed Oct 23 02:34:44 2013
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.util.Arrays;
@@ -42,10 +43,13 @@ public class HAUtil {
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.RM_WEBAPP_ADDRESS));
+ public static final String BAD_CONFIG_MESSAGE_PREFIX =
+ "Invalid configuration! ";
+
private HAUtil() { /* Hidden constructor */ }
private static void throwBadConfigurationException(String msg) {
- throw new YarnRuntimeException("Invalid configuration! " + msg);
+ throw new YarnRuntimeException(BAD_CONFIG_MESSAGE_PREFIX + msg);
}
/**
@@ -59,29 +63,137 @@ public class HAUtil {
YarnConfiguration.DEFAULT_RM_HA_ENABLED);
}
+ /**
+ * Verify configuration for Resource Manager HA.
+ * @param conf Configuration
+ * @throws YarnRuntimeException
+ */
+ public static void verifyAndSetConfiguration(Configuration conf)
+ throws YarnRuntimeException {
+ verifyAndSetRMHAIds(conf);
+ verifyAndSetRMHAId(conf);
+ verifyAndSetAllRpcAddresses(conf);
+ }
+
+
+ private static void verifyAndSetRMHAIds(Configuration conf) {
+ Collection<String> ids =
+ conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
+ if (ids.size() <= 0) {
+ throwBadConfigurationException(
+ getInvalidValueMessage(YarnConfiguration.RM_HA_IDS,
+ conf.get(YarnConfiguration.RM_HA_IDS)));
+ } else if (ids.size() == 1) {
+ LOG.warn(getRMHAIdsWarningMessage(ids.toString()));
+ }
+
+ StringBuilder setValue = new StringBuilder();
+ for (String id: ids) {
+ setValue.append(id);
+ setValue.append(",");
+ }
+ conf.set(YarnConfiguration.RM_HA_IDS,
+ setValue.substring(0, setValue.length() - 1));
+ }
+
+ private static void verifyAndSetRMHAId(Configuration conf) {
+ String rmId = conf.getTrimmed(YarnConfiguration.RM_HA_ID);
+ if (rmId == null) {
+ throwBadConfigurationException(
+ getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID));
+ } else {
+ Collection<String> ids = getRMHAIds(conf);
+ if (!ids.contains(rmId)) {
+ throwBadConfigurationException(
+ getRMHAIdNeedToBeIncludedMessage(ids.toString(), rmId));
+ }
+ }
+ conf.set(YarnConfiguration.RM_HA_ID, rmId);
+ }
+
+ private static void verifyAndSetConfValue(String prefix, Configuration conf)
{
+ String confKey = null;
+ String confValue = null;
+ try {
+ confKey = getConfKeyForRMInstance(prefix, conf);
+ confValue = getConfValueForRMInstance(prefix, conf);
+ conf.set(prefix, confValue);
+ } catch (YarnRuntimeException yre) {
+ // Error at getRMHAId()
+ throw yre;
+ } catch (IllegalArgumentException iae) {
+ String errmsg;
+ if (confKey == null) {
+ // Error at addSuffix
+ errmsg = getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
+ getRMHAId(conf));
+ } else {
+ // Error at Configuration#set.
+ errmsg = getNeedToSetValueMessage(confKey);
+ }
+ throwBadConfigurationException(errmsg);
+ }
+ }
+
+ public static void verifyAndSetAllRpcAddresses(Configuration conf) {
+ for (String confKey : RPC_ADDRESS_CONF_KEYS) {
+ verifyAndSetConfValue(confKey, conf);
+ }
+ }
+
+ /**
+ * @param conf Configuration. Please use getRMHAIds to check.
+ * @return RM Ids on success
+ */
public static Collection<String> getRMHAIds(Configuration conf) {
- return conf.getTrimmedStringCollection(YarnConfiguration.RM_HA_IDS);
+ return conf.getStringCollection(YarnConfiguration.RM_HA_IDS);
}
/**
- * @param conf Configuration
+ * @param conf Configuration. Please use verifyAndSetRMHAId to check.
* @return RM Id on success
- * @throws YarnRuntimeException for configurations without a node id
*/
@VisibleForTesting
- public static String getRMHAId(Configuration conf) {
- String rmId = conf.get(YarnConfiguration.RM_HA_ID);
- if (rmId == null) {
- throwBadConfigurationException(YarnConfiguration.RM_HA_ID +
- " needs to be set in a HA configuration");
- }
- return rmId;
+ static String getRMHAId(Configuration conf) {
+ return conf.get(YarnConfiguration.RM_HA_ID);
+ }
+
+ @VisibleForTesting
+ static String getNeedToSetValueMessage(String confKey) {
+ return confKey + " needs to be set in a HA configuration.";
+ }
+
+ @VisibleForTesting
+ static String getInvalidValueMessage(String confKey,
+ String invalidValue){
+ return "Invalid value of " + confKey +". "
+ + "Current value is " + invalidValue;
+ }
+
+ @VisibleForTesting
+ static String getRMHAIdNeedToBeIncludedMessage(String ids,
+ String rmId) {
+ return YarnConfiguration.RM_HA_IDS + "("
+ + ids + ") need to contain " + YarnConfiguration.RM_HA_ID + "("
+ + rmId + ") in a HA configuration.";
+ }
+
+ @VisibleForTesting
+ static String getRMHAIdsWarningMessage(String ids) {
+ return "Resource Manager HA is enabled, but " +
+ YarnConfiguration.RM_HA_IDS + " has only one id(" +
+ ids.toString() + ")";
+ }
+
+ private static String getConfKeyForRMInstance(String prefix,
+ Configuration conf) {
+ return addSuffix(prefix, getRMHAId(conf));
}
private static String getConfValueForRMInstance(String prefix,
Configuration conf) {
- String confKey = addSuffix(prefix, getRMHAId(conf));
- String retVal = conf.get(confKey);
+ String confKey = getConfKeyForRMInstance(prefix, conf);
+ String retVal = conf.getTrimmed(confKey);
if (LOG.isTraceEnabled()) {
LOG.trace("getConfValueForRMInstance: prefix = " + prefix +
"; confKey being looked up = " + confKey +
@@ -96,16 +208,6 @@ public class HAUtil {
return (value == null) ? defaultValue : value;
}
- private static void setConfValue(String prefix, Configuration conf) {
- conf.set(prefix, getConfValueForRMInstance(prefix, conf));
- }
-
- public static void setAllRpcAddresses(Configuration conf) {
- for (String confKey : RPC_ADDRESS_CONF_KEYS) {
- setConfValue(confKey, conf);
- }
- }
-
/** Add non empty and non null suffix to a key */
@VisibleForTesting
public static String addSuffix(String key, String suffix) {
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestHAUtil.java
Wed Oct 23 02:34:44 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.conf;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Before;
import org.junit.Test;
@@ -27,53 +28,134 @@ import org.junit.Test;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
public class TestHAUtil {
private Configuration conf;
- private static final String RM1_ADDRESS = "1.2.3.4:8021";
+ private static final String RM1_ADDRESS_UNTRIMMED = " \t\t\n 1.2.3.4:8021
\n\t ";
+ private static final String RM1_ADDRESS = RM1_ADDRESS_UNTRIMMED.trim();
private static final String RM2_ADDRESS = "localhost:8022";
- private static final String RM1_NODE_ID = "rm1";
+ private static final String RM1_NODE_ID_UNTRIMMED = "rm1 ";
+ private static final String RM1_NODE_ID = RM1_NODE_ID_UNTRIMMED.trim();
private static final String RM2_NODE_ID = "rm2";
+ private static final String RM3_NODE_ID = "rm3";
+ private static final String RM_INVALID_NODE_ID = ".rm";
+ private static final String RM_NODE_IDS_UNTRIMMED = RM1_NODE_ID_UNTRIMMED +
"," + RM2_NODE_ID;
+ private static final String RM_NODE_IDS = RM1_NODE_ID + "," + RM2_NODE_ID;
@Before
public void setUp() {
conf = new Configuration();
- conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
- conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM_NODE_IDS_UNTRIMMED);
+ conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
- conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
+ // configuration key itself cannot contains space/tab/return chars.
+ conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
}
}
@Test
public void testGetRMServiceId() throws Exception {
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
Collection<String> rmhaIds = HAUtil.getRMHAIds(conf);
assertEquals(2, rmhaIds.size());
+
+ String[] ids = rmhaIds.toArray(new String[0]);
+ assertEquals(RM1_NODE_ID, ids[0]);
+ assertEquals(RM2_NODE_ID, ids[1]);
}
@Test
public void testGetRMId() throws Exception {
+ conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
assertEquals("Does not honor " + YarnConfiguration.RM_HA_ID,
- RM1_NODE_ID, HAUtil.getRMHAId(conf));
- conf = new YarnConfiguration();
- try {
- HAUtil.getRMHAId(conf);
- fail("getRMHAId() fails to throw an exception when RM_HA_ID is not set");
- } catch (YarnRuntimeException yre) {
- // do nothing
- }
+ RM1_NODE_ID, HAUtil.getRMHAId(conf));
+
+ conf.clear();
+ assertNull("Return null when " + YarnConfiguration.RM_HA_ID
+ + " is not set", HAUtil.getRMHAId(conf));
}
@Test
- public void testSetGetRpcAddresses() throws Exception {
- HAUtil.setAllRpcAddresses(conf);
+ public void testVerifyAndSetConfiguration() throws Exception {
+ try {
+ HAUtil.verifyAndSetConfiguration(conf);
+ } catch (YarnRuntimeException e) {
+ fail("Should not throw any exceptions.");
+ }
+
+ assertEquals("Should be saved as Trimmed collection",
+ StringUtils.getStringCollection(RM_NODE_IDS), HAUtil.getRMHAIds(conf));
+ assertEquals("Should be saved as Trimmed string",
+ RM1_NODE_ID, HAUtil.getRMHAId(conf));
for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
assertEquals("RPC address not set for " + confKey,
- RM1_ADDRESS, conf.get(confKey));
+ RM1_ADDRESS, conf.get(confKey));
+ }
+
+ conf.clear();
+ conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
+ try {
+ HAUtil.verifyAndSetConfiguration(conf);
+ } catch (YarnRuntimeException e) {
+ assertEquals("YarnRuntimeException by getRMId()",
+ HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+ HAUtil.getNeedToSetValueMessage(YarnConfiguration.RM_HA_ID),
+ e.getMessage());
+ }
+
+ conf.clear();
+ conf.set(YarnConfiguration.RM_HA_ID, RM_INVALID_NODE_ID);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM_INVALID_NODE_ID);
+ for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+ // simulate xml with invalid node id
+ conf.set(confKey + RM_INVALID_NODE_ID, RM_INVALID_NODE_ID);
+ }
+ try {
+ HAUtil.verifyAndSetConfiguration(conf);
+ } catch (YarnRuntimeException e) {
+ assertEquals("YarnRuntimeException by addSuffix()",
+ HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+ HAUtil.getInvalidValueMessage(YarnConfiguration.RM_HA_ID,
+ RM_INVALID_NODE_ID),
+ e.getMessage());
+ }
+
+ conf.clear();
+ // simulate the case HAUtil.RPC_ADDRESS_CONF_KEYS are not set
+ conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID);
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID);
+ try {
+ HAUtil.verifyAndSetConfiguration(conf);
+ fail("Should throw YarnRuntimeException. by Configuration#set()");
+ } catch (YarnRuntimeException e) {
+ String confKey =
+ HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID);
+ assertEquals("YarnRuntimeException by Configuration#set()",
+ HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
HAUtil.getNeedToSetValueMessage(confKey),
+ e.getMessage());
+ }
+
+ // simulate the case YarnConfiguration.RM_HA_IDS doesn't contain
+ // the value of YarnConfiguration.RM_HA_ID
+ conf.clear();
+ conf.set(YarnConfiguration.RM_HA_IDS, RM2_NODE_ID + "," + RM3_NODE_ID);
+ conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID_UNTRIMMED);
+ for (String confKey : HAUtil.RPC_ADDRESS_CONF_KEYS) {
+ conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS_UNTRIMMED);
+ conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+ }
+ try {
+ HAUtil.verifyAndSetConfiguration(conf);
+ } catch (YarnRuntimeException e) {
+ assertEquals("YarnRuntimeException by getRMId()'s validation",
+ HAUtil.BAD_CONFIG_MESSAGE_PREFIX +
+ HAUtil.getRMHAIdNeedToBeIncludedMessage("[rm2, rm3]", RM1_NODE_ID),
+ e.getMessage());
}
}
}
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java
Wed Oct 23 02:34:44 2013
@@ -57,7 +57,7 @@ public class RMHAProtocolService extends
this.conf = conf;
haEnabled = HAUtil.isHAEnabled(this.conf);
if (haEnabled) {
- HAUtil.setAllRpcAddresses(this.conf);
+ HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf);
}
rm.createAndInitActiveServices();
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
Wed Oct 23 02:34:44 2013
@@ -378,22 +378,24 @@ public class QueueManager {
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout,
defaultMinSharePreemptionTimeout);
- // Update metrics
+ // Make sure all queues exist
+ for (String name: queueNamesInAllocFile) {
+ getLeafQueue(name, true);
+ }
+
for (FSQueue queue : queues.values()) {
+ // Update queue metrics
FSQueueMetrics queueMetrics = queue.getMetrics();
queueMetrics.setMinShare(queue.getMinShare());
queueMetrics.setMaxShare(queue.getMaxShare());
+ // Set scheduling policies
+ if (queuePolicies.containsKey(queue.getName())) {
+ queue.setPolicy(queuePolicies.get(queue.getName()));
+ } else {
+ queue.setPolicy(SchedulingPolicy.getDefault());
+ }
}
- // Create all queus
- for (String name: queueNamesInAllocFile) {
- getLeafQueue(name, true);
- }
-
- // Set custom policies as specified
- for (Map.Entry<String, SchedulingPolicy> entry :
queuePolicies.entrySet()) {
- queues.get(entry.getKey()).setPolicy(entry.getValue());
- }
}
}
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Wed Oct 23 02:34:44 2013
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
@@ -807,6 +808,7 @@ public class TestFairScheduler {
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
+ out.println("<schedulingPolicy>fair</schedulingPolicy>");
out.println("</queue>");
// Give queue C no minimum
out.println("<queue name=\"queueC\">");
@@ -833,6 +835,8 @@ public class TestFairScheduler {
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+ // Set default scheduling policy to DRF
+
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
@@ -894,6 +898,18 @@ public class TestFairScheduler {
assertEquals(120000,
queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000,
queueManager.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
+
+ // Verify existing queues have default scheduling policy
+ assertEquals(DominantResourceFairnessPolicy.NAME,
+ queueManager.getQueue("root").getPolicy().getName());
+ assertEquals(DominantResourceFairnessPolicy.NAME,
+ queueManager.getQueue("root.queueA").getPolicy().getName());
+ // Verify default is overriden if specified explicitly
+ assertEquals(FairSharePolicy.NAME,
+ queueManager.getQueue("root.queueB").getPolicy().getName());
+ // Verify new queue gets default scheduling policy
+ assertEquals(DominantResourceFairnessPolicy.NAME,
+ queueManager.getLeafQueue("root.newqueue",
true).getPolicy().getName());
}
@Test
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1534894&r1=1534893&r2=1534894&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Wed Oct 23 02:34:44 2013
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,8 +36,10 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -52,6 +56,10 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
/**
@@ -83,6 +91,9 @@ public class MiniYARNCluster extends Com
private ResourceManagerWrapper resourceManagerWrapper;
+ private ConcurrentMap<ApplicationAttemptId, Long> appMasters =
+ new ConcurrentHashMap<ApplicationAttemptId, Long>(16, 0.75f, 2);
+
private File testWorkDir;
// Number of nm-local-dirs per nodemanager
@@ -210,6 +221,16 @@ public class MiniYARNCluster extends Com
};
};
resourceManager.init(conf);
+
resourceManager.getRMContext().getDispatcher().register(RMAppAttemptEventType.class,
+ new EventHandler<RMAppAttemptEvent>() {
+ public void handle(RMAppAttemptEvent event) {
+ if (event instanceof RMAppAttemptRegistrationEvent) {
+ appMasters.put(event.getApplicationAttemptId(),
event.getTimestamp());
+ } else if (event instanceof RMAppAttemptUnregistrationEvent) {
+ appMasters.remove(event.getApplicationAttemptId());
+ }
+ }
+ });
super.serviceInit(conf);
}
@@ -243,9 +264,22 @@ public class MiniYARNCluster extends Com
WebAppUtils.getRMWebAppURLWithoutScheme(getConfig()));
}
+ private void waitForAppMastersToFinish(long timeoutMillis) throws
InterruptedException {
+ long started = System.currentTimeMillis();
+ synchronized (appMasters) {
+ while (!appMasters.isEmpty() && System.currentTimeMillis() - started <
timeoutMillis) {
+ appMasters.wait(1000);
+ }
+ }
+ if (!appMasters.isEmpty()) {
+ LOG.warn("Stopping RM while some app masters are still alive");
+ }
+ }
+
@Override
protected synchronized void serviceStop() throws Exception {
if (resourceManager != null) {
+ waitForAppMastersToFinish(5000);
resourceManager.stop();
}
super.serviceStop();