This is an automated email from the ASF dual-hosted git repository. epayne pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new bdd396b YARN-8546. Resource leak caused by a reserved container being released more than once under async scheduling. Contributed by Tao Yang. bdd396b is described below commit bdd396b26d421aea8747ad3254fc56e77f6fbefa Author: Weiwei Yang <w...@apache.org> AuthorDate: Wed Jul 25 17:35:27 2018 +0800 YARN-8546. Resource leak caused by a reserved container being released more than once under async scheduling. Contributed by Tao Yang. (cherry picked from commit 5be9f4a5d05c9cb99348719fe35626b1de3055db) --- .../scheduler/common/fica/FiCaSchedulerApp.java | 15 ++++ .../TestCapacitySchedulerAsyncScheduling.java | 96 ++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 27a1684..d865e986 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -366,6 +366,21 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { .isEmpty()) { for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> releaseContainer : allocation.getToRelease()) { + // Make sure to-release reserved containers are not outdated + if (releaseContainer.getRmContainer().getState() + == RMContainerState.RESERVED + && releaseContainer.getRmContainer() != releaseContainer + .getSchedulerNode().getReservedContainer()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to accept this proposal because " + + "it tries to release an outdated reserved container " + + releaseContainer.getRmContainer().getContainerId() + + " on node " + releaseContainer.getSchedulerNode().getNodeID() + + " whose reserved container is " + + releaseContainer.getSchedulerNode().getReservedContainer()); + } + return false; + } // Only consider non-reserved container (reserved container will // not affect available resource of node) on the same node if (releaseContainer.getRmContainer().getState() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index c4e5493..6481f36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -668,6 +669,101 @@ public class TestCapacitySchedulerAsyncScheduling { rm.stop(); } + + @Test(timeout = 60000) + public void testReleaseOutdatedReservedContainer() throws Exception { + /* + * Submit a application, reserved container_02 on nm1, + * submit two allocate proposals which contain the same reserved + * container_02 as to-released container. + * First proposal should be accepted, second proposal should be rejected + * because it tries to release an outdated reserved container + */ + MockRM rm1 = new MockRM(); + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); + MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB); + rm1.drainEvents(); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default"); + SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId()); + SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId()); + SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId()); + + // launch another app to queue, AM container should be launched in nm1 + RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + Resource allocateResource = Resources.createResource(5 * GB); + am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0, + new ArrayList<ContainerId>(), ""); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Assert.assertEquals(1, schedulerApp1.getReservedContainers().size()); + Assert.assertEquals(9 * GB, + defaultQueue.getQueueResourceUsage().getUsed().getMemorySize()); + + RMContainer reservedContainer = + schedulerApp1.getReservedContainers().get(0); + ResourceCommitRequest allocateFromSameReservedContainerProposal1 = + createAllocateFromReservedProposal(3, allocateResource, schedulerApp1, + sn2, sn1, cs.getRMContext(), reservedContainer); + Assert.assertNotNull("Container should be reserved", + sn1.getReservedContainer());; + Assert.assertEquals("No memory should be used on " + sn2.getNodeName(), + 0, sn2.getAllocatedResource().getMemorySize()); + cs.tryCommit(cs.getClusterResource(), + allocateFromSameReservedContainerProposal1); + Assert.assertNull("Container should have been unreserved", + sn1.getReservedContainer());; + Assert.assertEquals("Memory should be used on " + sn2.getNodeName(), + allocateResource.getMemorySize(), + sn2.getAllocatedResource().getMemorySize()); + ResourceCommitRequest allocateFromSameReservedContainerProposal2 = + createAllocateFromReservedProposal(4, allocateResource, schedulerApp1, + sn3, sn1, cs.getRMContext(), reservedContainer); + cs.tryCommit(cs.getClusterResource(), + allocateFromSameReservedContainerProposal2); + Assert.assertFalse("This proposal should be rejected because " + + "it tries to release an outdated reserved container", + sn3.getAllocatedResource().getMemorySize() != 0); + + rm1.close(); + } + + private ResourceCommitRequest createAllocateFromReservedProposal( + int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp, + SchedulerNode allocateNode, SchedulerNode reservedNode, + RMContext rmContext, RMContainer reservedContainer) { + Container container = Container.newInstance( + ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(), containerId), + allocateNode.getNodeID(), allocateNode.getHttpAddress(), allocateResource, + Priority.newInstance(0), null); + RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey + .create(ResourceRequest + .newInstance(Priority.newInstance(0), "*", allocateResource, 1)), + schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), "user", + rmContext); + SchedulerContainer allocateContainer = + new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "", true); + SchedulerContainer reservedSchedulerContainer = + new SchedulerContainer(schedulerApp, reservedNode, reservedContainer, "", + false); + List<SchedulerContainer> toRelease = new ArrayList<>(); + toRelease.add(reservedSchedulerContainer); + ContainerAllocationProposal allocateFromReservedProposal = + new ContainerAllocationProposal(allocateContainer, toRelease, null, + NodeType.OFF_SWITCH, NodeType.OFF_SWITCH, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource); + List<ContainerAllocationProposal> allocateProposals = new ArrayList<>(); + allocateProposals.add(allocateFromReservedProposal); + return new ResourceCommitRequest(allocateProposals, null, null); + } private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm, int nContainer, Resource resource, int priority, int startContainerId) --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org