Author: arp
Date: Wed Dec 11 23:18:29 2013
New Revision: 1550313
URL: http://svn.apache.org/r1550313
Log:
Merging r1549949 through r1550312 from trunk to branch HDFS-2832
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestYarnVersionInfo.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Wed Dec 11
23:18:29 2013
@@ -154,6 +154,14 @@ Release 2.4.0 - UNRELEASED
YARN-1378. Implemented a cleaner of old finished applications from the RM
state-store. (Jian He via vinodkv)
+ YARN-1481. Move internal services logic from AdminService to
ResourceManager.
+ (vinodkv via kasha)
+
+ YARN-1491. Upgrade JUnit3 TestCase to JUnit 4 (Chen He via jeagles)
+
+ YARN-408. Change CapacityScheduler to not disable delay-scheduling by
default.
+ (Mayank Bansal via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLinuxResourceCalculatorPlugin.java
Wed Dec 11 23:18:29 2013
@@ -23,16 +23,16 @@ import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
-import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
/**
* A JUnit test to test {@link LinuxResourceCalculatorPlugin}
* Create the fake /proc/ information and verify the parsing and calculation
*/
-public class TestLinuxResourceCalculatorPlugin extends TestCase {
+public class TestLinuxResourceCalculatorPlugin {
/**
* LinuxResourceCalculatorPlugin with a fake timer
*/
@@ -145,7 +145,7 @@ public class TestLinuxResourceCalculator
* @throws IOException
*/
@Test
- public void testParsingProcStatAndCpuFile() throws IOException {
+ public void parsingProcStatAndCpuFile() throws IOException {
// Write fake /proc/cpuinfo file.
long numProcessors = 8;
long cpuFrequencyKHz = 2392781;
@@ -171,7 +171,7 @@ public class TestLinuxResourceCalculator
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
- assertEquals(plugin.getCpuUsage(),
(float)(LinuxResourceCalculatorPlugin.UNAVAILABLE));
+ assertEquals(plugin.getCpuUsage(),
(float)(LinuxResourceCalculatorPlugin.UNAVAILABLE),0.0);
// Advance the time and sample again to test the CPU usage calculation
uTime += 100L;
@@ -179,13 +179,13 @@ public class TestLinuxResourceCalculator
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
- assertEquals(plugin.getCpuUsage(), 6.25F);
+ assertEquals(plugin.getCpuUsage(), 6.25F, 0.0);
// Advance the time and sample again. This time, we call getCpuUsage()
only.
uTime += 600L;
plugin.advanceTime(300L);
updateStatFile(uTime, nTime, sTime);
- assertEquals(plugin.getCpuUsage(), 25F);
+ assertEquals(plugin.getCpuUsage(), 25F, 0.0);
// Advance very short period of time (one jiffy length).
// In this case, CPU usage should not be updated.
@@ -194,7 +194,7 @@ public class TestLinuxResourceCalculator
updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
- assertEquals(plugin.getCpuUsage(), 25F); // CPU usage is not updated.
+ assertEquals(plugin.getCpuUsage(), 25F, 0.0); // CPU usage is not updated.
}
/**
@@ -212,7 +212,7 @@ public class TestLinuxResourceCalculator
* @throws IOException
*/
@Test
- public void testParsingProcMemFile() throws IOException {
+ public void parsingProcMemFile() throws IOException {
long memTotal = 4058864L;
long memFree = 99632L;
long inactive = 567732L;
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsBasedProcessTree.java
Wed Dec 11 23:18:29 2013
@@ -22,10 +22,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Shell;
-import junit.framework.TestCase;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
-public class TestWindowsBasedProcessTree extends TestCase {
+public class TestWindowsBasedProcessTree {
private static final Log LOG = LogFactory
.getLog(TestWindowsBasedProcessTree.class);
@@ -41,7 +41,7 @@ public class TestWindowsBasedProcessTree
}
@Test (timeout = 30000)
- public void testTree() {
+ public void tree() {
if( !Shell.WINDOWS) {
LOG.info("Platform not Windows. Not testing");
return;
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestWindowsResourceCalculatorPlugin.java
Wed Dec 11 23:18:29 2013
@@ -18,10 +18,10 @@
package org.apache.hadoop.yarn.util;
-import junit.framework.TestCase;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
-public class TestWindowsResourceCalculatorPlugin extends TestCase {
+public class TestWindowsResourceCalculatorPlugin {
class WindowsResourceCalculatorPluginTester extends
WindowsResourceCalculatorPlugin {
@@ -33,7 +33,7 @@ public class TestWindowsResourceCalculat
}
@Test (timeout = 30000)
- public void testParseSystemInfoString() {
+ public void parseSystemInfoString() {
WindowsResourceCalculatorPluginTester tester = new
WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr =
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
@@ -51,7 +51,7 @@ public class TestWindowsResourceCalculat
}
@Test (timeout = 20000)
- public void testRefreshAndCpuUsage() throws InterruptedException {
+ public void refreshAndCpuUsage() throws InterruptedException {
WindowsResourceCalculatorPluginTester tester = new
WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr =
"17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n";
@@ -75,7 +75,7 @@ public class TestWindowsResourceCalculat
}
@Test (timeout = 20000)
- public void testErrorInGetSystemInfo() {
+ public void errorInGetSystemInfo() {
WindowsResourceCalculatorPluginTester tester = new
WindowsResourceCalculatorPluginTester();
// info str derived from windows shell command has \r\n termination
tester.infoStr = null;
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestYarnVersionInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestYarnVersionInfo.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestYarnVersionInfo.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestYarnVersionInfo.java
Wed Dec 11 23:18:29 2013
@@ -18,23 +18,23 @@
package org.apache.hadoop.yarn.util;
-import junit.framework.TestCase;
import java.io.IOException;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.junit.Test;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
/**
* A JUnit test to test {@link YarnVersionInfo}
*/
-public class TestYarnVersionInfo extends TestCase {
+public class TestYarnVersionInfo {
/**
* Test the yarn version info routines.
* @throws IOException
*/
@Test
- public void testVersionInfoGenerated() throws IOException {
+ public void versionInfoGenerated() throws IOException {
// can't easily know what the correct values are going to be so just
// make sure they aren't Unknown
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml
Wed Dec 11 23:18:29 2013
@@ -99,12 +99,12 @@
<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
- <value>-1</value>
+ <value>40</value>
<description>
Number of missed scheduling opportunities after which the
CapacityScheduler
attempts to schedule rack-local containers.
- Typically this should be set to number of racks in the cluster, this
- feature is disabled by default, set to -1.
+ Typically this should be set to number of nodes in the cluster, By
default is setting
+ approximately number of nodes in one rack which is 40.
</description>
</property>
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
Wed Dec 11 23:18:29 2013
@@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.net.InetSocketAddress;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +41,6 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api
import
org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import
org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
+import com.google.protobuf.BlockingService;
+
public class AdminService extends AbstractService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
@@ -73,10 +72,6 @@ public class AdminService extends Abstra
private final RMContext rmContext;
private final ResourceManager rm;
- @VisibleForTesting
- protected HAServiceProtocol.HAServiceState
- haState = HAServiceProtocol.HAServiceState.INITIALIZING;
- boolean haEnabled;
private Server server;
private InetSocketAddress masterServiceAddress;
@@ -93,13 +88,6 @@ public class AdminService extends Abstra
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
- haEnabled = HAUtil.isHAEnabled(conf);
- if (haEnabled) {
- HAUtil.verifyAndSetConfiguration(conf);
- rm.setConf(conf);
- }
- rm.createAndInitActiveServices();
-
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@@ -112,11 +100,6 @@ public class AdminService extends Abstra
@Override
protected synchronized void serviceStart() throws Exception {
- if (haEnabled) {
- transitionToStandby(true);
- } else {
- transitionToActive();
- }
startServer();
super.serviceStart();
}
@@ -124,8 +107,6 @@ public class AdminService extends Abstra
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
- transitionToStandby(false);
- haState = HAServiceState.STOPPING;
super.serviceStop();
}
@@ -145,7 +126,7 @@ public class AdminService extends Abstra
refreshServiceAcls(conf, new RMPolicyProvider());
}
- if (haEnabled) {
+ if (rmContext.isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@@ -182,39 +163,27 @@ public class AdminService extends Abstra
}
private synchronized boolean isRMActive() {
- return HAServiceState.ACTIVE == haState;
+ return HAServiceState.ACTIVE == rmContext.getHAServiceState();
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE &&
!rm.areActiveServicesRunning()) {
+ if (isRMActive() && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
- synchronized void transitionToActive() throws Exception {
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
- LOG.info("Already in active state");
- return;
- }
-
- LOG.info("Transitioning to active");
- rm.startActiveServices();
- haState = HAServiceProtocol.HAServiceState.ACTIVE;
- LOG.info("Transitioned to active");
- }
-
@Override
- public synchronized void
transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
- throws IOException {
+ public synchronized void transitionToActive(
+ HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
- transitionToActive();
+ rm.transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
@@ -226,32 +195,14 @@ public class AdminService extends Abstra
}
}
- synchronized void transitionToStandby(boolean initialize)
- throws Exception {
- if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
- LOG.info("Already in standby state");
- return;
- }
-
- LOG.info("Transitioning to standby");
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
- rm.stopActiveServices();
- if (initialize) {
- rm.createAndInitActiveServices();
- }
- }
- haState = HAServiceProtocol.HAServiceState.STANDBY;
- LOG.info("Transitioned to standby");
- }
-
@Override
- public synchronized void
transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
- throws IOException {
+ public synchronized void transitionToStandby(
+ HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
- transitionToStandby(true);
+ rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
@@ -266,15 +217,15 @@ public class AdminService extends Abstra
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
+ HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
- if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
- HAServiceProtocol.HAServiceState.STANDBY) {
+ if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
- }
+ }
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
Wed Dec 11 23:18:29 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.res
public interface RMContext {
Dispatcher getDispatcher();
-
+
+ boolean isHAEnabled();
+
+ HAServiceState getHAServiceState();
+
RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
Wed Dec 11 23:18:29 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.re
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import
org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
-import
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import
org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +56,10 @@ public class RMContextImpl implements RM
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
+ private boolean isHAEnabled;
+ private HAServiceState haServiceState =
+ HAServiceProtocol.HAServiceState.INITIALIZING;
+
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
@@ -211,6 +217,16 @@ public class RMContextImpl implements RM
return resourceTrackerService;
}
+ void setHAEnabled(boolean isHAEnabled) {
+ this.isHAEnabled = isHAEnabled;
+ }
+
+ void setHAServiceState(HAServiceState haServiceState) {
+ synchronized (haServiceState) {
+ this.haServiceState = haServiceState;
+ }
+ }
+
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
@@ -290,4 +306,16 @@ public class RMContextImpl implements RM
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
+
+ @Override
+ public boolean isHAEnabled() {
+ return isHAEnabled;
+ }
+
+ @Override
+ public HAServiceState getHAServiceState() {
+ synchronized (haServiceState) {
+ return haServiceState;
+ }
+ }
}
\ No newline at end of file
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Wed Dec 11 23:18:29 2013
@@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaug
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -188,6 +191,12 @@ public class ResourceManager extends Com
addService(adminService);
rmContext.setRMAdminService(adminService);
+ this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
+ if (this.rmContext.isHAEnabled()) {
+ HAUtil.verifyAndSetConfiguration(conf);
+ }
+ createAndInitActiveServices();
+
super.serviceInit(conf);
}
@@ -217,9 +226,8 @@ public class ResourceManager extends Com
}
protected RMStateStoreOperationFailedEventDispatcher
- createRMStateStoreOperationFailedEventDispatcher() {
- return new RMStateStoreOperationFailedEventDispatcher(
- rmContext.getRMAdminService());
+ createRMStateStoreOperationFailedEventDispatcher() {
+ return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
}
protected Dispatcher createDispatcher() {
@@ -655,11 +663,14 @@ public class ResourceManager extends Com
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
- private final AdminService adminService;
- public RMStateStoreOperationFailedEventDispatcher(
- AdminService adminService) {
- this.adminService = adminService;
+ private final RMContext rmContext;
+ private final ResourceManager rm;
+
+ public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
+ ResourceManager resourceManager) {
+ this.rmContext = rmContext;
+ this.rm = resourceManager;
}
@Override
@@ -671,16 +682,14 @@ public class ResourceManager extends Com
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
- synchronized(adminService) {
- if (adminService.haEnabled) {
- try {
- // Transition to standby and reinit active services
- LOG.info("Transitioning RM to Standby mode");
- adminService.transitionToStandby(true);
- return;
- } catch (Exception e) {
- LOG.error("Failed to transition RM to Standby mode.");
- }
+ if (rmContext.isHAEnabled()) {
+ try {
+ // Transition to standby and reinit active services
+ LOG.info("Transitioning RM to Standby mode");
+ rm.transitionToStandby(true);
+ return;
+ } catch (Exception e) {
+ LOG.error("Failed to transition RM to Standby mode.");
}
}
}
@@ -826,10 +835,6 @@ public class ResourceManager extends Com
webApp = builder.start(new RMWebApp(this));
}
- void setConf(Configuration configuration) {
- conf = configuration;
- }
-
/**
* Helper method to create and init {@link #activeServices}. This creates an
* instance of {@link RMActiveServices} and initializes it.
@@ -870,6 +875,39 @@ public class ResourceManager extends Com
return activeServices != null && activeServices.isInState(STATE.STARTED);
}
+ synchronized void transitionToActive() throws Exception {
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.ACTIVE) {
+ LOG.info("Already in active state");
+ return;
+ }
+
+ LOG.info("Transitioning to active state");
+ startActiveServices();
+ rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
+ LOG.info("Transitioned to active state");
+ }
+
+ synchronized void transitionToStandby(boolean initialize)
+ throws Exception {
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.STANDBY) {
+ LOG.info("Already in standby state");
+ return;
+ }
+
+ LOG.info("Transitioning to standby state");
+ if (rmContext.getHAServiceState() ==
+ HAServiceProtocol.HAServiceState.ACTIVE) {
+ stopActiveServices();
+ if (initialize) {
+ createAndInitActiveServices();
+ }
+ }
+ rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
+ LOG.info("Transitioned to standby state");
+ }
+
@Override
protected void serviceStart() throws Exception {
try {
@@ -877,6 +915,13 @@ public class ResourceManager extends Com
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
+
+ if (this.rmContext.isHAEnabled()) {
+ transitionToStandby(true);
+ } else {
+ transitionToActive();
+ }
+
super.serviceStart();
}
@@ -888,6 +933,8 @@ public class ResourceManager extends Com
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
+ transitionToStandby(false);
+ rmContext.setHAServiceState(HAServiceState.STOPPING);
}
protected ResourceTrackerService createResourceTrackerService() {
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
Wed Dec 11 23:18:29 2013
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import javax.security.auth.login.Configuration;
+
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@@ -88,7 +90,9 @@ public class TestRM {
public void testAppOnMultiNode() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
- MockRM rm = new MockRM();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set("yarn.scheduler.capacity.node-locality-delay", "-1");
+ MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("h1:1234", 5120);
MockNM nm2 = rm.registerNode("h2:5678", 10240);
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1550313&r1=1550312&r2=1550313&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
(original)
+++
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
Wed Dec 11 23:18:29 2013
@@ -1066,6 +1066,9 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// node_1 heartbeats in and gets the DEFAULT_RACK request for app_1
+ // We do not need locality delay here
+ doReturn(-1).when(a).getNodeLocalityDelay();
+
a.assignContainers(clusterResource, node_1);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
@@ -1649,7 +1652,7 @@ public class TestLeafQueue {
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// before reinitialization
- assertEquals(0, e.getNodeLocalityDelay());
+ assertEquals(40, e.getNodeLocalityDelay());
csConf.setInt(CapacitySchedulerConfiguration
.NODE_LOCALITY_DELAY, 60);
@@ -1932,10 +1935,10 @@ public class TestLeafQueue {
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1);
- verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
+ verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
- assertEquals(0, app_0.getTotalRequiredResources(priority));
+ assertEquals(1, app_0.getTotalRequiredResources(priority));
// Now sanity-check node_local
app_0_requests_0.add(