Author: tucu
Date: Thu Oct 31 02:57:33 2013
New Revision: 1537369
URL: http://svn.apache.org/r1537369
Log:
YARN-1343. NodeManagers additions/restarts are not reported as node updates in
AllocateResponse responses to AMs. (tucu)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
- copied unchanged from r1537368,
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1537369&r1=1537368&r2=1537369&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Oct 31
02:57:33 2013
@@ -156,6 +156,9 @@ Release 2.2.1 - UNRELEASED
YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via
cnauroth)
+ YARN-1343. NodeManagers additions/restarts are not reported as node
updates
+ in AllocateResponse responses to AMs. (tucu)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java?rev=1537369&r1=1537368&r2=1537369&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
Thu Oct 31 02:57:33 2013
@@ -160,17 +160,14 @@ public class NodesListManager extends Ab
if (unusableRMNodesConcurrentSet.contains(eventNode)) {
LOG.debug(eventNode + " reported usable");
unusableRMNodesConcurrentSet.remove(eventNode);
- for (RMApp app : rmContext.getRMApps().values()) {
- this.rmContext
- .getDispatcher()
- .getEventHandler()
- .handle(
- new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
- RMAppNodeUpdateType.NODE_USABLE));
- }
- } else {
- LOG.warn(eventNode
- + " reported usable without first reporting unusable");
+ }
+ for (RMApp app : rmContext.getRMApps().values()) {
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+ RMAppNodeUpdateType.NODE_USABLE));
}
break;
default:
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1537369&r1=1537368&r2=1537369&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
Thu Oct 31 02:57:33 2013
@@ -438,7 +438,10 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
-
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
+
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
@@ -471,7 +474,7 @@ public class RMNodeImpl implements RMNod
// Only add new node if old state is not UNHEALTHY
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode));
- }
+ }
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
@@ -486,6 +489,9 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
}
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1537369&r1=1537368&r2=1537369&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
Thu Oct 31 02:57:33 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -79,6 +81,18 @@ public class TestRMNodeTransitions {
}
}
+ private NodesListManagerEvent nodesListManagerEvent = null;
+
+ private class TestNodeListManagerEventDispatcher implements
+ EventHandler<NodesListManagerEvent> {
+
+ @Override
+ public void handle(NodesListManagerEvent event) {
+ nodesListManagerEvent = event;
+ }
+
+ }
+
@Before
public void setUp() throws Exception {
InlineDispatcher rmDispatcher = new InlineDispatcher();
@@ -109,8 +123,12 @@ public class TestRMNodeTransitions {
rmDispatcher.register(SchedulerEventType.class,
new TestSchedulerEventDispatcher());
+ rmDispatcher.register(NodesListManagerEventType.class,
+ new TestNodeListManagerEventDispatcher());
+
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
+ nodesListManagerEvent = null;
}
@@ -431,8 +449,9 @@ public class TestRMNodeTransitions {
private RMNodeImpl getRunningNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+ Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
- null, null, null);
+ null, capability, null);
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
@@ -447,4 +466,60 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
return node;
}
+
+
+ private RMNodeImpl getNewNode() {
+ NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+ RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
null, null);
+ return node;
+ }
+
+ @Test
+ public void testAdd() {
+ RMNodeImpl node = getNewNode();
+ ClusterMetrics cm = ClusterMetrics.getMetrics();
+ int initialActive = cm.getNumActiveNMs();
+ int initialLost = cm.getNumLostNMs();
+ int initialUnhealthy = cm.getUnhealthyNMs();
+ int initialDecommissioned = cm.getNumDecommisionedNMs();
+ int initialRebooted = cm.getNumRebootedNMs();
+ node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ Assert.assertEquals("Active Nodes", initialActive + 1,
cm.getNumActiveNMs());
+ Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+ Assert.assertEquals("Unhealthy Nodes",
+ initialUnhealthy, cm.getUnhealthyNMs());
+ Assert.assertEquals("Decommissioned Nodes",
+ initialDecommissioned, cm.getNumDecommisionedNMs());
+ Assert.assertEquals("Rebooted Nodes",
+ initialRebooted, cm.getNumRebootedNMs());
+ Assert.assertEquals(NodeState.RUNNING, node.getState());
+ Assert.assertNotNull(nodesListManagerEvent);
+ Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+ nodesListManagerEvent.getType());
+ }
+
+ @Test
+ public void testReconnect() {
+ RMNodeImpl node = getRunningNode();
+ ClusterMetrics cm = ClusterMetrics.getMetrics();
+ int initialActive = cm.getNumActiveNMs();
+ int initialLost = cm.getNumLostNMs();
+ int initialUnhealthy = cm.getUnhealthyNMs();
+ int initialDecommissioned = cm.getNumDecommisionedNMs();
+ int initialRebooted = cm.getNumRebootedNMs();
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node));
+ Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+ Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+ Assert.assertEquals("Unhealthy Nodes",
+ initialUnhealthy, cm.getUnhealthyNMs());
+ Assert.assertEquals("Decommissioned Nodes",
+ initialDecommissioned, cm.getNumDecommisionedNMs());
+ Assert.assertEquals("Rebooted Nodes",
+ initialRebooted, cm.getNumRebootedNMs());
+ Assert.assertEquals(NodeState.RUNNING, node.getState());
+ Assert.assertNotNull(nodesListManagerEvent);
+ Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+ nodesListManagerEvent.getType());
+ }
+
}