This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new bdd396b  YARN-8546. Resource leak caused by a reserved container being 
released more than once under async scheduling. Contributed by Tao Yang.
bdd396b is described below

commit bdd396b26d421aea8747ad3254fc56e77f6fbefa
Author: Weiwei Yang <w...@apache.org>
AuthorDate: Wed Jul 25 17:35:27 2018 +0800

    YARN-8546. Resource leak caused by a reserved container being released more 
than once under async scheduling. Contributed by Tao Yang.
    
    (cherry picked from commit 5be9f4a5d05c9cb99348719fe35626b1de3055db)
---
 .../scheduler/common/fica/FiCaSchedulerApp.java    | 15 ++++
 .../TestCapacitySchedulerAsyncScheduling.java      | 96 ++++++++++++++++++++++
 2 files changed, 111 insertions(+)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 27a1684..d865e986 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -366,6 +366,21 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
         .isEmpty()) {
       for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
           releaseContainer : allocation.getToRelease()) {
+        // Make sure to-release reserved containers are not outdated
+        if (releaseContainer.getRmContainer().getState()
+            == RMContainerState.RESERVED
+            && releaseContainer.getRmContainer() != releaseContainer
+            .getSchedulerNode().getReservedContainer()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to accept this proposal because "
+                + "it tries to release an outdated reserved container "
+                + releaseContainer.getRmContainer().getContainerId()
+                + " on node " + releaseContainer.getSchedulerNode().getNodeID()
+                + " whose reserved container is "
+                + releaseContainer.getSchedulerNode().getReservedContainer());
+          }
+          return false;
+        }
         // Only consider non-reserved container (reserved container will
         // not affect available resource of node) on the same node
         if (releaseContainer.getRmContainer().getState()
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index c4e5493..6481f36 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -668,6 +669,101 @@ public class TestCapacitySchedulerAsyncScheduling {
 
     rm.stop();
   }
+ 
+  @Test(timeout = 60000)
+  public void testReleaseOutdatedReservedContainer() throws Exception {
+    /*
+     * Submit a application, reserved container_02 on nm1,
+     * submit two allocate proposals which contain the same reserved
+     * container_02 as to-released container.
+     * First proposal should be accepted, second proposal should be rejected
+     * because it tries to release an outdated reserved container
+     */
+    MockRM rm1 = new MockRM();
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+    MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB);
+    rm1.drainEvents();
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default");
+    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+    SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+    SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
+
+    // launch another app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    Resource allocateResource = Resources.createResource(5 * GB);
+    am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0,
+        new ArrayList<ContainerId>(), "");
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+    Assert.assertEquals(9 * GB,
+        defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
+
+    RMContainer reservedContainer =
+        schedulerApp1.getReservedContainers().get(0);
+    ResourceCommitRequest allocateFromSameReservedContainerProposal1 =
+        createAllocateFromReservedProposal(3, allocateResource, schedulerApp1,
+            sn2, sn1, cs.getRMContext(), reservedContainer);
+    Assert.assertNotNull("Container should be reserved",
+        sn1.getReservedContainer());;
+    Assert.assertEquals("No memory should be used on " + sn2.getNodeName(),
+        0, sn2.getAllocatedResource().getMemorySize());
+    cs.tryCommit(cs.getClusterResource(),
+        allocateFromSameReservedContainerProposal1);
+    Assert.assertNull("Container should have been unreserved",
+        sn1.getReservedContainer());;
+    Assert.assertEquals("Memory should be used on " + sn2.getNodeName(),
+        allocateResource.getMemorySize(),
+        sn2.getAllocatedResource().getMemorySize());
+    ResourceCommitRequest allocateFromSameReservedContainerProposal2 =
+        createAllocateFromReservedProposal(4, allocateResource, schedulerApp1,
+            sn3, sn1, cs.getRMContext(), reservedContainer);
+    cs.tryCommit(cs.getClusterResource(),
+        allocateFromSameReservedContainerProposal2);
+    Assert.assertFalse("This proposal should be rejected because "
+        + "it tries to release an outdated reserved container",
+        sn3.getAllocatedResource().getMemorySize() != 0);
+
+    rm1.close();
+  }
+
+  private ResourceCommitRequest createAllocateFromReservedProposal(
+      int containerId, Resource allocateResource, FiCaSchedulerApp 
schedulerApp,
+      SchedulerNode allocateNode, SchedulerNode reservedNode,
+      RMContext rmContext, RMContainer reservedContainer) {
+    Container container = Container.newInstance(
+        ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(), 
containerId),
+        allocateNode.getNodeID(), allocateNode.getHttpAddress(), 
allocateResource,
+        Priority.newInstance(0), null);
+    RMContainer rmContainer = new RMContainerImpl(container, 
SchedulerRequestKey
+        .create(ResourceRequest
+            .newInstance(Priority.newInstance(0), "*", allocateResource, 1)),
+        schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), 
"user",
+        rmContext);
+    SchedulerContainer allocateContainer =
+        new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "", 
true);
+    SchedulerContainer reservedSchedulerContainer =
+        new SchedulerContainer(schedulerApp, reservedNode, reservedContainer, 
"",
+            false);
+    List<SchedulerContainer> toRelease = new ArrayList<>();
+    toRelease.add(reservedSchedulerContainer);
+    ContainerAllocationProposal allocateFromReservedProposal =
+        new ContainerAllocationProposal(allocateContainer, toRelease, null,
+            NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
+    List<ContainerAllocationProposal> allocateProposals = new ArrayList<>();
+    allocateProposals.add(allocateFromReservedProposal);
+    return new ResourceCommitRequest(allocateProposals, null, null);
+  }
 
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to