YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id has not been reset synchronously. (Jun Gong via rohithsharmaks)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/feaf0349 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/feaf0349 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/feaf0349 Branch: refs/heads/HDFS-7240 Commit: feaf0349949e831ce3f25814c1bbff52f17bfe8f Parents: bcaf839 Author: Rohith Sharma K S <rohithsharm...@apache.org> Authored: Mon Aug 24 11:25:07 2015 +0530 Committer: Rohith Sharma K S <rohithsharm...@apache.org> Committed: Mon Aug 24 11:25:07 2015 +0530 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 3 ++ .../yarn/sls/scheduler/RMNodeWrapper.java | 5 +++ hadoop-yarn-project/CHANGES.txt | 3 ++ .../resourcemanager/ResourceTrackerService.java | 2 + .../server/resourcemanager/rmnode/RMNode.java | 7 +++- .../resourcemanager/rmnode/RMNodeImpl.java | 15 +++++--- .../yarn/server/resourcemanager/MockNodes.java | 4 ++ .../resourcetracker/TestNMReconnect.java | 39 ++++++++++++++++++++ 8 files changed, 72 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 440779c..2d2c3e0 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -149,6 +149,9 @@ public class NodeInfo { return null; } + public void resetLastNodeHeartBeatResponse() { + } + public List<UpdatedContainerInfo> pullContainerUpdates() { ArrayList<UpdatedContainerInfo> list = new ArrayList<UpdatedContainerInfo>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index a6633ae..ecc4734 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -135,6 +135,11 @@ public class RMNodeWrapper implements RMNode { } @Override + public void resetLastNodeHeartBeatResponse() { + node.getLastNodeHeartBeatResponse().setResponseId(0); + } + + @Override @SuppressWarnings("unchecked") public List<UpdatedContainerInfo> pullContainerUpdates() { List<UpdatedContainerInfo> list = Collections.EMPTY_LIST; http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5904a31..bf58c96 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -792,6 +792,9 @@ Release 2.8.0 - UNRELEASED YARN-3986. getTransferredContainers in AbstractYarnScheduler should be present in YarnScheduler interface instead. (Varun Saxena via rohithsharmaks) + YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id + has not been reset synchronously. (Jun Gong via rohithsharmaks) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 3c2c09b..100e991 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -325,6 +325,8 @@ public class ResourceTrackerService extends AbstractService implements } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); + // Reset heartbeat ID since node just restarted. + oldNode.resetLastNodeHeartBeatResponse(); this.rmContext .getDispatcher() .getEventHandler() http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 0386be6..00cd3b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -129,7 +129,12 @@ public interface RMNode { public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); - + + /** + * Reset lastNodeHeartbeatResponse's ID to 0. + */ + void resetLastNodeHeartBeatResponse(); + /** * Get and clear the list of containerUpdates accumulated across NM * heartbeats. http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index f182d02..7a1ba74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -443,6 +443,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } } + @Override + public void resetLastNodeHeartBeatResponse() { + this.writeLock.lock(); + try { + latestNodeHeartBeatResponse.setResponseId(0); + } finally { + this.writeLock.unlock(); + } + } + public void handle(RMNodeEvent event) { LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType()); try { @@ -617,8 +627,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { new NodeRemovedSchedulerEvent(rmNode)); if (rmNode.getHttpPort() == newNode.getHttpPort()) { - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); if (!rmNode.getTotalCapability().equals( newNode.getTotalCapability())) { rmNode.totalCapability = newNode.getTotalCapability(); @@ -656,9 +664,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode); - // Reset heartbeat ID since node just restarted. - rmNode.getLastNodeHeartBeatResponse().setResponseId(0); - for (ApplicationId appId : reconnectEvent.getRunningApplications()) { handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 095fe28..53cb8d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -201,6 +201,10 @@ public class MockNodes { } @Override + public void resetLastNodeHeartBeatResponse() { + } + + @Override public String getNodeManagerVersion() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/feaf0349/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index b525efc..dce3d06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; @@ -189,4 +194,38 @@ public class TestNMReconnect { nlm.stop(); scheduler.stop(); } + + @Test(timeout = 10000) + public void testRMNodeStatusAfterReconnect() throws Exception { + // The node(127.0.0.1:1234) reconnected with RM. When it registered with + // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But + // the node's heartbeat come before RM succeeded setting the id to 0. + final DrainDispatcher dispatcher = new DrainDispatcher(); + MockRM rm = new MockRM(){ + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + int i = 0; + while(i < 3) { + nm1.nodeHeartbeat(true); + dispatcher.await(); + i++; + } + + MockNM nm2 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm2.registerNode(); + RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + nm2.nodeHeartbeat(true); + dispatcher.await(); + Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING, + rmNode.getState()); + rm.stop(); + } }