YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race 
condition (wilfreds and gphillips via rkanter)

(cherry picked from commit 0cd145a44390bc1a01113dce4be4e629637c3e8a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/93d6ed85
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/93d6ed85
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/93d6ed85

Branch: refs/remotes/origin/branch-3.1
Commit: 93d6ed859e92dc132df2b3ae92fabbe5f958004d
Parents: 10e35ed
Author: Robert Kanter <rkan...@apache.org>
Authored: Mon Jun 4 15:32:03 2018 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Mon Jun 4 15:32:47 2018 -0700

----------------------------------------------------------------------
 .../scheduler/AbstractYarnScheduler.java        | 19 +++++----
 .../scheduler/fair/FairScheduler.java           |  2 +-
 .../scheduler/fifo/FifoScheduler.java           |  6 ++-
 .../capacity/TestCapacityScheduler.java         | 44 ++++++++++++++------
 .../scheduler/fair/TestFairScheduler.java       | 43 ++++++++++++++-----
 .../scheduler/fifo/TestFifoScheduler.java       | 44 +++++++++++++++-----
 6 files changed, 115 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 18c7b4e..d2e81a5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -1106,12 +1106,16 @@ public abstract class AbstractYarnScheduler
     }
 
     // Process new container information
+    // NOTICE: it is possible to not find the NodeID as a node can be
+    // decommissioned at the same time. Skip updates if node is null.
     SchedulerNode schedulerNode = getNode(nm.getNodeID());
     List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
         schedulerNode);
 
     // Notify Scheduler Node updated.
-    schedulerNode.notifyNodeUpdate();
+    if (schedulerNode != null) {
+      schedulerNode.notifyNodeUpdate();
+    }
 
     // Process completed containers
     Resource releasedResources = Resource.newInstance(0, 0);
@@ -1121,9 +1125,7 @@ public abstract class AbstractYarnScheduler
     // If the node is decommissioning, send an update to have the total
     // resource equal to the used resource, so no available resource to
     // schedule.
-    // TODO YARN-5128: Fix possible race-condition when request comes in before
-    // update is propagated
-    if (nm.getState() == NodeState.DECOMMISSIONING) {
+    if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) {
       this.rmContext
           .getDispatcher()
           .getEventHandler()
@@ -1133,13 +1135,16 @@ public abstract class AbstractYarnScheduler
     }
 
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
-    updateNodeResourceUtilization(nm, schedulerNode);
+    if (schedulerNode != null) {
+      updateNodeResourceUtilization(nm, schedulerNode);
+    }
 
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
       LOG.debug(
-          "Node being looked for scheduling " + nm + " availableResource: "
-              + schedulerNode.getUnallocatedResource());
+          "Node being looked for scheduling " + nm + " availableResource: " +
+              (schedulerNode == null ? "unknown (decommissioned)" :
+                  schedulerNode.getUnallocatedResource()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 1f85814..13874bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1030,7 +1030,7 @@ public class FairScheduler extends
         return;
       }
 
-      final NodeId nodeID = node.getNodeID();
+      final NodeId nodeID = (node != null ? node.getNodeID() : null);
       if (!nodeTracker.exists(nodeID)) {
         // The node might have just been removed while this thread was waiting
         // on the synchronized lock before it entered this synchronized method

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 7ac9027..8396db5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -966,8 +966,10 @@ public class FifoScheduler extends
       return;
     }
 
-    if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
-        node.getUnallocatedResource(), minimumAllocation)) {
+    // A decommissioned node might be removed before we get here
+    if (node != null &&
+        Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
+            node.getUnallocatedResource(), minimumAllocation)) {
       LOG.debug("Node heartbeat " + nm.getNodeID() +
           " available resource = " + node.getUnallocatedResource());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 1d2aadc..0b54010 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -258,14 +258,12 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
     }
   }
 
-  private NodeManager
-      registerNode(String hostName, int containerManagerPort, int httpPort,
-          String rackName, Resource capability)
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int httpPort, String rackName,
+                                   Resource capability)
           throws IOException, YarnException {
-    NodeManager nm =
-        new NodeManager(
-            hostName, containerManagerPort, httpPort, rackName, capability,
-            resourceManager);
+    NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
+        rackName, capability, resourceManager);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
             .getRMNodes().get(nm.getNodeId()));
@@ -280,13 +278,13 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
 
     // Register node1
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
+    NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(4 * GB, 1));
 
     // Register node2
     String host_1 = "host_1";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
+    NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(2 * GB, 1));
 
@@ -4038,6 +4036,29 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
       Assert.fail("Cannot find RMContainer");
     }
   }
+  @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // force remove the node to simulate race condition
+    ((CapacityScheduler) 
resourceManager.getResourceScheduler()).getNodeTracker().
+        removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
 
   @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
@@ -4064,9 +4085,8 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index d9c06a7..3a8e929 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -4969,6 +4970,30 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
   }
 
   @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // Force remove the node to simulate race condition
+    ((FairScheduler) resourceManager.getResourceScheduler())
+        .getNodeTracker().removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
+
+  @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
     // to have 0 available resource
@@ -4993,9 +5018,8 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
 
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@@ -5033,13 +5057,12 @@ public class TestFairScheduler extends 
FairSchedulerTestBase {
     Assert.assertEquals(availableResource.getVirtualCores(), 0);
   }
 
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager 
registerNode(
-      String hostName, int containerManagerPort, int httpPort, String rackName,
-      Resource capability) throws IOException, YarnException {
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
-        new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
-            containerManagerPort, httpPort, rackName, capability,
-            resourceManager);
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int httpPort, String rackName,
+                                   Resource capability)
+      throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
+        rackName, capability, resourceManager);
 
     // after YARN-5375, scheduler event is processed in rm main dispatcher,
     // wait it processed, or may lead dead lock

http://git-wip-us.apache.org/repos/asf/hadoop/blob/93d6ed85/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 8814c0e..ee66a49 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -138,14 +139,12 @@ public class TestFifoScheduler {
     resourceManager.stop();
   }
   
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
-      registerNode(String hostName, int containerManagerPort, int nmHttpPort,
-          String rackName, Resource capability) throws IOException,
-          YarnException {
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
-        new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
-            containerManagerPort, nmHttpPort, rackName, capability,
-            resourceManager);
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int nmHttpPort, String rackName,
+                                   Resource capability)
+      throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName, containerManagerPort,
+        nmHttpPort, rackName, capability, resourceManager);
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
             .get(nm.getNodeId()));
@@ -1196,6 +1195,30 @@ public class TestFifoScheduler {
   }
 
   @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // Force remove the node to simulate race condition
+    ((FifoScheduler) resourceManager.getResourceScheduler())
+        .getNodeTracker().removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
+
+  @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
     // to have 0 available resource
@@ -1220,9 +1243,8 @@ public class TestFifoScheduler {
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to