Author: vinodkv
Date: Mon Jul 29 18:15:36 2013
New Revision: 1508159
URL: http://svn.apache.org/r1508159
Log:
YARN-245. Fixed NodeManager to handle duplicate responses from ResourceManager.
Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1508157 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1508159&r1=1508158&r2=1508159&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jul 29
18:15:36 2013
@@ -745,6 +745,11 @@ Release 2.1.0-beta - 2013-07-02
YARN-960. Fixed ResourceManager to propagate client-submitted credentials
irrespective of security. (Daryn Sharp via vinodkv)
+ YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
+
+ YARN-245. Fixed NodeManager to handle duplicate responses from
+ ResourceManager. (Mayank Bansal via vinodkv)
+
BREAKDOWN OF HADOOP-8562/YARN-191 SUBTASKS AND RELATED JIRAS
YARN-158. Yarn creating package-info.java must not depend on sh.
@@ -810,8 +815,6 @@ Release 2.1.0-beta - 2013-07-02
YARN-909. Disable TestLinuxContainerExecutorWithMocks on Windows. (Chuan
Liu
via cnauroth)
- YARN-937. Fix unmanaged AM in non-secure/secure setup post YARN-701. (tucu)
-
Release 2.0.5-alpha - 06/06/2013
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1508159&r1=1508158&r2=1508159&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
Mon Jul 29 18:15:36 2013
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.SecurityUtil;
@@ -158,7 +159,7 @@ public class NodeManager extends Composi
addService(del);
// NodeManager level dispatcher
- this.dispatcher = new AsyncDispatcher();
+ this.dispatcher = (AsyncDispatcher) createDispatcher();
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
@@ -203,6 +204,16 @@ public class NodeManager extends Composi
// TODO add local dirs to del
}
+ @Private
+ protected Dispatcher createDispatcher(){
+ return new AsyncDispatcher();
+ }
+
+ @Private
+ public Dispatcher getDispatcher(){
+ return this.dispatcher;
+ }
+
@Override
protected void serviceStart() throws Exception {
try {
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1508159&r1=1508158&r2=1508159&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
Mon Jul 29 18:15:36 2013
@@ -369,6 +369,13 @@ public class NodeStatusUpdaterImpl exten
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
response = resourceTracker.nodeHeartbeat(request);
+ // Checking if the response id is the same which we just processed
+ // If yes then ignore the update.
+ if (lastHeartBeatID != response.getResponseId() - 1) {
+ LOG.info("Discarding the duplicate response "
+ + response.getResponseId());
+ continue;
+ }
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
updateMasterKeys(response);
@@ -395,7 +402,6 @@ public class NodeStatusUpdaterImpl exten
new NodeManagerEvent(NodeManagerEventType.RESYNC));
break;
}
-
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1508159&r1=1508158&r2=1508159&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Mon Jul 29 18:15:36 2013
@@ -59,7 +59,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -431,6 +433,26 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyNodeManager7 extends NodeManager {
+ private ResourceTracker resourceTracker;
+ private MyNodeStatusUpdater3 nodeStatusUpdater;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ this.nodeStatusUpdater =
+ new MyNodeStatusUpdater3(context, dispatcher, healthChecker,
metrics);
+ resourceTracker = new MyResourceTracker7(context);
+ this.nodeStatusUpdater.resourceTracker = resourceTracker;
+
+ return this.nodeStatusUpdater;
+ }
+
+ protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+ return this.nodeStatusUpdater;
+ }
+ }
+
private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
@@ -552,6 +574,68 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker7 implements ResourceTracker {
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ private final Context context;
+ private int lastRequestedHeartBeat = 0;
+ private boolean gotDuplicateHeartBeatRequest = false;
+ private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+
+ MyResourceTracker7(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException, IOException {
+ RegisterNodeManagerResponse response =
+ recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(registerNodeAction);
+ response.setContainerTokenMasterKey(createMasterKey());
+ response.setNMTokenMasterKey(createMasterKey());
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+
+ if (lastRequestedHeartBeat != 0
+ && lastRequestedHeartBeat ==
request.getNodeStatus().getResponseId()) {
+ LOG.info("GOT Duplicate heartbeatId "
+ + request.getNodeStatus().getResponseId());
+ gotDuplicateHeartBeatRequest = true;
+ }
+ lastRequestedHeartBeat = request.getNodeStatus().getResponseId();
+ LOG.info("Got heartBeatId: [" + heartBeatID + "]");
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+ NodeHeartbeatResponse nhResponse =
+ YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+ heartBeatNodeAction, null, null, null, null, 1000L);
+
+ if (heartBeatID == 5) {
+ LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+ this.context.getApplications().put(appId, mock(Application.class));
+ nhResponse
+ .addAllApplicationsToCleanup(Collections.singletonList(appId));
+ }
+ if (heartBeatID == 6) {
+ nhResponse.setResponseId(5);
+ LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+ this.context.getApplications().put(appId, mock(Application.class));
+ nhResponse
+ .addAllApplicationsToCleanup(Collections.singletonList(appId));
+ }
+ return nhResponse;
+ }
+
+ public boolean isGotDuplicateHeartBeatRequest() {
+ return gotDuplicateHeartBeatRequest;
+ }
+ }
+
private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -745,7 +829,7 @@ public class TestNodeStatusUpdater {
lfs.delete(new Path(basedir.getPath()), true);
}
- @Test
+ @Test(timeout = 60000)
public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() {
@Override
@@ -805,7 +889,7 @@ public class TestNodeStatusUpdater {
nm.stop();
}
- @Test
+ @Test(timeout = 60000)
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
nm = new NodeManager() {
@@ -851,7 +935,49 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(numCleanups.get(), 1);
}
- @Test
+ @SuppressWarnings("rawtypes")
+ class MyDispatcher7 extends AsyncDispatcher {
+ public volatile int finishapp_event;
+
+ protected void dispatch(Event event) {
+ if (event.getType().name()
+ .equals(ContainerManagerEventType.FINISH_APPS.toString())) {
+ ++finishapp_event;
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testDuplicateResponseFromRM() throws Exception {
+ MyNodeManager7 nm = new MyNodeManager7() {
+ protected Dispatcher createDispatcher() {
+ return new MyDispatcher7();
+ }
+ };
+ try {
+ YarnConfiguration conf = createNMConfig();
+ conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 4000l);
+ nm.init(conf);
+ nm.start();
+ MyResourceTracker7 rt =
+ (MyResourceTracker7) nm.getNodeStatusUpdater().getRMClient();
+ while (heartBeatID < 7) {
+ Thread.sleep(1000l);
+ }
+ Assert.assertTrue(rt.isGotDuplicateHeartBeatRequest());
+
+ MyDispatcher7 nmdispatcher = (MyDispatcher7) nm.getDispatcher();
+ // We are sending two FINISH_APPS in heartbeat 5 and 6
+ // Checking we get only one time FINISH_APPS event which is the first one
+ Assert.assertEquals(1, nmdispatcher.finishapp_event);
+
+ } finally {
+ if (nm.getServiceState() == STATE.STARTED)
+ nm.stop();
+ }
+ }
+
+ @Test(timeout = 60000)
public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN);
YarnConfiguration conf = createNMConfig();
@@ -898,7 +1024,7 @@ public class TestNodeStatusUpdater {
NodeHealthCheckerService healthChecker);
}
- @Test
+ @Test(timeout = 60000)
public void testNMShutdownForRegistrationFailure() throws Exception {
nm = new NodeManagerWithCustomNodeStatusUpdater() {
@@ -1011,7 +1137,7 @@ public class TestNodeStatusUpdater {
* started properly, RM will think that the NM is alive and will retire the
NM
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
*/
- @Test
+ @Test(timeout = 60000)
public void testNoRegistrationWhenNMServicesFail() throws Exception {
nm = new NodeManager() {
@@ -1042,7 +1168,7 @@ public class TestNodeStatusUpdater {
verifyNodeStartFailure("Starting of RPC Server failed");
}
- @Test
+ @Test(timeout = 60000)
public void testApplicationKeepAlive() throws Exception {
MyNodeManager nm = new MyNodeManager();
try {