YARN-7258. Add Node and Rack Hints to Opportunistic Scheduler. (Kartheek Muthyala via asuresh).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b733348d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b733348d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b733348d Branch: refs/heads/HDFS-10467 Commit: b733348dde18a242e6c9074c512116a8baf1d281 Parents: 9288206 Author: Arun Suresh <asur...@apache.org> Authored: Mon Oct 2 18:01:51 2017 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Thu Oct 5 09:58:04 2017 -0700 ---------------------------------------------------------------------- .../yarn/api/records/ResourceRequest.java | 16 + .../api/impl/TestDistributedScheduling.java | 649 --------------- .../TestOpportunisticContainerAllocation.java | 784 ------------------- ...TestOpportunisticContainerAllocationE2E.java | 784 +++++++++++++++++++ .../server/api/protocolrecords/RemoteNode.java | 35 + .../impl/pb/RemoteNodePBImpl.java | 19 + .../OpportunisticContainerAllocator.java | 333 ++++++-- .../OpportunisticContainerContext.java | 68 +- .../yarn_server_common_service_protos.proto | 1 + .../TestOpportunisticContainerAllocator.java | 599 ++++++++++++++ ...pportunisticContainerAllocatorAMService.java | 8 +- ...pportunisticContainerAllocatorAMService.java | 2 + 12 files changed, 1781 insertions(+), 1517 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 21fa15f..beb3380 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -240,6 +240,22 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> { } /** + * Set the <code>executionTypeRequest</code> of the request with 'ensure + * execution type' flag set to true. + * @see ResourceRequest#setExecutionTypeRequest( + * ExecutionTypeRequest) + * @param executionType <code>executionType</code> of the request. + * @return {@link ResourceRequestBuilder} + */ + @Public + @Evolving + public ResourceRequestBuilder executionType(ExecutionType executionType) { + resourceRequest.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(executionType, true)); + return this; + } + + /** * Set the <code>allocationRequestId</code> of the request. * @see ResourceRequest#setAllocationRequestId(long) * @param allocationRequestId http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java deleted file mode 100644 index 00f5e03..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ /dev/null @@ -1,649 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProfileCapability; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Validates End2End Distributed Scheduling flow which includes the AM - * specifying OPPORTUNISTIC containers in its resource requests, - * the AMRMProxyService on the NM, the DistributedScheduler RequestInterceptor - * on the NM and the DistributedSchedulingProtocol used by the framework to talk - * to the OpportunisticContainerAllocatorAMService running on the RM. - */ -public class TestDistributedScheduling extends BaseAMRMProxyE2ETest { - - private static final Log LOG = - LogFactory.getLog(TestDistributedScheduling.class); - - protected MiniYARNCluster cluster; - protected YarnClient rmClient; - protected ApplicationMasterProtocol client; - protected Configuration conf; - protected Configuration yarnConf; - protected ApplicationAttemptId attemptId; - protected ApplicationId appId; - - @Before - public void doBefore() throws Exception { - cluster = new MiniYARNCluster("testDistributedSchedulingE2E", 1, 1, 1); - - conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration. - OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); - conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, - 10); - cluster.init(conf); - cluster.start(); - yarnConf = cluster.getConfig(); - - // the client has to connect to AMRMProxy - yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, - YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); - rmClient = YarnClient.createYarnClient(); - rmClient.init(yarnConf); - rmClient.start(); - - // Submit application - attemptId = createApp(rmClient, cluster, conf); - appId = attemptId.getApplicationId(); - client = createAMRMProtocol(rmClient, appId, cluster, yarnConf); - } - - @After - public void doAfter() throws Exception { - if (client != null) { - try { - client.finishApplicationMaster(FinishApplicationMasterRequest - .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); - rmClient.killApplication(attemptId.getApplicationId()); - attemptId = null; - } catch (Exception e) { - } - } - if (rmClient != null) { - try { - rmClient.stop(); - } catch (Exception e) { - } - } - if (cluster != null) { - try { - cluster.stop(); - } catch (Exception e) { - } - } - } - - - /** - * Validates if Allocate Requests containing only OPPORTUNISTIC container - * requests are satisfied instantly. - * - * @throws Exception - */ - @Test(timeout = 60000) - public void testOpportunisticExecutionTypeRequestE2E() throws Exception { - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - // Wait until the RM has been updated and verify - Map<ApplicationId, RMApp> rmApps = - cluster.getResourceManager().getRMContext().getRMApps(); - boolean rmUpdated = false; - for (int i=0; i<10 && !rmUpdated; i++) { - sleep(100); - RMApp rmApp = rmApps.get(appId); - if (rmApp.getState() == RMAppState.RUNNING) { - rmUpdated = true; - } - } - RMApp rmApp = rmApps.get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - - // Replace 'ANY' requests with OPPORTUNISTIC aks and remove - // everything else - List<ResourceRequest> newAskList = new ArrayList<>(); - for (ResourceRequest rr : request.getAskList()) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)); - newAskList.add(newRR); - } - } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); - } - - // Check that the RM sees OPPORTUNISTIC containers - ResourceScheduler scheduler = cluster.getResourceManager() - .getResourceScheduler(); - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerId containerId = allocatedContainer.getId(); - RMContainer rmContainer = scheduler.getRMContainer(containerId); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - rmContainer.getExecutionType()); - } - - LOG.info("testDistributedSchedulingE2E - Finish"); - } - - /** - * Validates if Allocate Requests containing both GUARANTEED and OPPORTUNISTIC - * container requests works as expected. - * - * @throws Exception - */ - @Test(timeout = 60000) - public void testMixedExecutionTypeRequestE2E() throws Exception { - LOG.info("testDistributedSchedulingE2E - Register"); - - RegisterApplicationMasterResponse responseRegister = - client.registerApplicationMaster(RegisterApplicationMasterRequest - .newInstance(NetUtils.getHostname(), 1024, "")); - - Assert.assertNotNull(responseRegister); - Assert.assertNotNull(responseRegister.getQueue()); - Assert.assertNotNull(responseRegister.getApplicationACLs()); - Assert.assertNotNull(responseRegister.getClientToAMTokenMasterKey()); - Assert - .assertNotNull(responseRegister.getContainersFromPreviousAttempts()); - Assert.assertNotNull(responseRegister.getSchedulerResourceTypes()); - Assert.assertNotNull(responseRegister.getMaximumResourceCapability()); - - RMApp rmApp = - cluster.getResourceManager().getRMContext().getRMApps().get(appId); - Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); - - LOG.info("testDistributedSchedulingE2E - Allocate"); - - AllocateRequest request = - createAllocateRequest(rmClient.getNodeReports(NodeState.RUNNING)); - List<ResourceRequest> askList = request.getAskList(); - List<ResourceRequest> newAskList = new ArrayList<>(askList); - - // Duplicate all ANY requests marking them as opportunistic - for (ResourceRequest rr : askList) { - if (ResourceRequest.ANY.equals(rr.getResourceName())) { - ResourceRequest newRR = ResourceRequest.newInstance(rr - .getPriority(), rr.getResourceName(), - rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true)); - newAskList.add(newRR); - } - } - request.setAskList(newAskList); - - AllocateResponse allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - - // Ensure that all the requests are satisfied immediately - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are OPPORTUNISTIC - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.OPPORTUNISTIC, - containerTokenIdentifier.getExecutionType()); - } - - request.setAskList(new ArrayList<ResourceRequest>()); - request.setResponseId(request.getResponseId() + 1); - - Thread.sleep(1000); - - // RM should allocate GUARANTEED containers within 2 calls to allocate() - allocResponse = client.allocate(request); - Assert.assertNotNull(allocResponse); - Assert.assertEquals(2, allocResponse.getAllocatedContainers().size()); - - // Verify that the allocated containers are GUARANTEED - for (Container allocatedContainer : allocResponse - .getAllocatedContainers()) { - ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils - .newContainerTokenIdentifier( - allocatedContainer.getContainerToken()); - Assert.assertEquals(ExecutionType.GUARANTEED, - containerTokenIdentifier.getExecutionType()); - } - - LOG.info("testDistributedSchedulingE2E - Finish"); - } - - /** - * Validates if AMRMClient can be used with Distributed Scheduling turned on. - * - * @throws Exception - */ - @Test(timeout = 120000) - @SuppressWarnings("unchecked") - public void testAMRMClient() throws Exception { - AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null; - try { - Priority priority = Priority.newInstance(1); - Priority priority2 = Priority.newInstance(2); - Resource capability = Resource.newInstance(1024, 1); - - List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - String[] nodes = new String[]{node}; - String[] racks = new String[]{rack}; - - // start am rm client - amClient = new AMRMClientImpl(client); - amClient.init(yarnConf); - amClient.start(); - amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, ""); - - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - RemoteRequestsTable<ContainerRequest> remoteRequestsTable = - amClient.getTable(0); - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); - - int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) - .remoteRequest.getNumContainers(); - int oppContainersRequestedAny = - remoteRequestsTable.get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest - .getNumContainers(); - - assertEquals(2, containersRequestedNode); - assertEquals(2, containersRequestedRack); - assertEquals(2, containersRequestedAny); - assertEquals(1, oppContainersRequestedAny); - - assertEquals(4, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - int iterationsLeft = 10; - Set<ContainerId> releases = new TreeSet<>(); - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap<String, Token> receivedNMTokens = new HashMap<>(); - - while (allocatedContainerCount < - (containersRequestedAny + oppContainersRequestedAny) - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - allocatedContainerCount += allocResponse.getAllocatedContainers() - .size(); - for (Container container : allocResponse.getAllocatedContainers()) { - ContainerId rejectContainerId = container.getId(); - releases.add(rejectContainerId); - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < containersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(allocatedContainerCount, - containersRequestedAny + oppContainersRequestedAny); - for (ContainerId rejectContainerId : releases) { - amClient.releaseAssignedContainer(rejectContainerId); - } - assertEquals(3, amClient.release.size()); - assertEquals(0, amClient.ask.size()); - - // need to tell the AMRMClient that we dont need these resources anymore - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - assertEquals(4, amClient.ask.size()); - - // test RPC exception handling - amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, - nodes, racks, priority)); - amClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, - nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, - 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - final AMRMClient amc = amClient; - ApplicationMasterProtocol realRM = amClient.rmClient; - try { - ApplicationMasterProtocol mockRM = mock(ApplicationMasterProtocol - .class); - when(mockRM.allocate(any(AllocateRequest.class))).thenAnswer( - new Answer<AllocateResponse>() { - public AllocateResponse answer(InvocationOnMock invocation) - throws Exception { - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, - racks, priority)); - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, - priority)); - amc.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, - priority2, 0, true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - throw new Exception(); - } - }); - amClient.rmClient = mockRM; - amClient.allocate(0.1f); - } catch (Exception ioe) { - } finally { - amClient.rmClient = realRM; - } - - assertEquals(3, amClient.release.size()); - assertEquals(6, amClient.ask.size()); - - iterationsLeft = 3; - // do a few iterations to ensure RM is not going send new containers - while (iterationsLeft-- > 0) { - // inform RM of rejection - AllocateResponse allocResponse = amClient.allocate(0.1f); - // RM did not send new containers because AM does not need any - assertEquals(0, allocResponse.getAllocatedContainers().size()); - if (allocResponse.getCompletedContainersStatuses().size() > 0) { - for (ContainerStatus cStatus : allocResponse - .getCompletedContainersStatuses()) { - if (releases.contains(cStatus.getContainerId())) { - assertEquals(cStatus.getState(), ContainerState.COMPLETE); - assertEquals(-100, cStatus.getExitStatus()); - releases.remove(cStatus.getContainerId()); - } - } - } - if (iterationsLeft > 0) { - // sleep to make sure NM's heartbeat - sleep(100); - } - } - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, - null, null); - - } finally { - if (amClient != null && amClient.getServiceState() == Service.STATE - .STARTED) { - amClient.stop(); - } - } - } - - /** - * Check if an AM can ask for opportunistic containers and get them. - * @throws Exception - */ - @Test - public void testAMOpportunistic() throws Exception { - // Basic container to request - Resource capability = Resource.newInstance(1024, 1); - Priority priority = Priority.newInstance(1); - - // Get the cluster topology - List<NodeReport> nodeReports = rmClient.getNodeReports(NodeState.RUNNING); - String node = nodeReports.get(0).getNodeId().getHost(); - String rack = nodeReports.get(0).getRackName(); - String[] nodes = new String[]{node}; - String[] racks = new String[]{rack}; - - // Create an AM to request resources - AMRMClient<AMRMClient.ContainerRequest> amClient = null; - try { - amClient = new AMRMClientImpl<AMRMClient.ContainerRequest>(client); - amClient.init(yarnConf); - amClient.start(); - amClient.registerApplicationMaster(NetUtils.getHostname(), 1024, ""); - - // AM requests an opportunistic container - ExecutionTypeRequest execTypeRequest = - ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true); - ContainerRequest containerRequest = new AMRMClient.ContainerRequest( - capability, nodes, racks, priority, 0, true, null, execTypeRequest); - amClient.addContainerRequest(containerRequest); - - // Wait until the container is allocated - ContainerId opportunisticContainerId = null; - for (int i=0; i<10 && opportunisticContainerId == null; i++) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - List<Container> allocatedContainers = - allocResponse.getAllocatedContainers(); - for (Container allocatedContainer : allocatedContainers) { - // Check that this is the container we required - assertEquals(ExecutionType.OPPORTUNISTIC, - allocatedContainer.getExecutionType()); - opportunisticContainerId = allocatedContainer.getId(); - } - sleep(100); - } - assertNotNull(opportunisticContainerId); - - // The RM sees the container as OPPORTUNISTIC - ResourceScheduler scheduler = cluster.getResourceManager() - .getResourceScheduler(); - RMContainer rmContainer = scheduler.getRMContainer( - opportunisticContainerId); - assertEquals(ExecutionType.OPPORTUNISTIC, - rmContainer.getExecutionType()); - - // Release the opportunistic container - amClient.releaseAssignedContainer(opportunisticContainerId); - // Wait for the release container to appear - boolean released = false; - for (int i=0; i<10 && !released; i++) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - List<ContainerStatus> completedContainers = - allocResponse.getCompletedContainersStatuses(); - for (ContainerStatus completedContainer : completedContainers) { - ContainerId completedContainerId = - completedContainer.getContainerId(); - assertEquals(completedContainerId, opportunisticContainerId); - released = true; - } - if (!released) { - sleep(100); - } - } - assertTrue(released); - - // The RM shouldn't see the container anymore - rmContainer = scheduler.getRMContainer(opportunisticContainerId); - assertNull(rmContainer); - - // Clean the AM - amClient.unregisterApplicationMaster( - FinalApplicationStatus.SUCCEEDED, null, null); - } finally { - if (amClient != null && - amClient.getServiceState() == Service.STATE.STARTED) { - amClient.close(); - } - } - } - - private void sleep(int sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java deleted file mode 100644 index 12c32fc..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java +++ /dev/null @@ -1,784 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; -import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.ContainerUpdateType; -import org.apache.hadoop.yarn.api.records.ExecutionType; -import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.NMToken; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProfileCapability; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.Token; -import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.api.records.UpdatedContainer; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.ClientRMProxy; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.NMTokenCache; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .AbstractYarnScheduler; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.util.Records; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -/** - * Class that tests the allocation of OPPORTUNISTIC containers through the - * centralized ResourceManager. - */ -public class TestOpportunisticContainerAllocation { - private static Configuration conf = null; - private static MiniYARNCluster yarnCluster = null; - private static YarnClient yarnClient = null; - private static List<NodeReport> nodeReports = null; - private static int nodeCount = 3; - - private static final int ROLLING_INTERVAL_SEC = 13; - private static final long AM_EXPIRE_MS = 4000; - - private static Resource capability; - private static ProfileCapability profileCapability; - private static Priority priority; - private static Priority priority2; - private static Priority priority3; - private static Priority priority4; - private static String node; - private static String rack; - private static String[] nodes; - private static String[] racks; - private final static int DEFAULT_ITERATION = 3; - - // Per test.. - private ApplicationAttemptId attemptId = null; - private AMRMClientImpl<AMRMClient.ContainerRequest> amClient = null; - private long availMB; - private int availVCores; - private long allocMB; - private int allocVCores; - - @BeforeClass - public static void setup() throws Exception { - // start minicluster - conf = new YarnConfiguration(); - conf.setLong( - YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, - ROLLING_INTERVAL_SEC); - conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, AM_EXPIRE_MS); - conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 1000); - // set the minimum allocation so that resource decrease can go under 1024 - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); - conf.setBoolean( - YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true); - conf.setInt( - YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10); - conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); - yarnCluster = - new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1); - yarnCluster.init(conf); - yarnCluster.start(); - - // start rm client - yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); - yarnClient.start(); - - // get node info - nodeReports = yarnClient.getNodeReports(NodeState.RUNNING); - - priority = Priority.newInstance(1); - priority2 = Priority.newInstance(2); - priority3 = Priority.newInstance(3); - priority4 = Priority.newInstance(4); - capability = Resource.newInstance(512, 1); - profileCapability = ProfileCapability.newInstance(capability); - - node = nodeReports.get(0).getNodeId().getHost(); - rack = nodeReports.get(0).getRackName(); - nodes = new String[]{node}; - racks = new String[]{rack}; - } - - @Before - public void startApp() throws Exception { - // submit new app - ApplicationSubmissionContext appContext = - yarnClient.createApplication().getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); - // set the application name - appContext.setApplicationName("Test"); - // Set the priority for the application master - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(0); - appContext.setPriority(pri); - // Set the queue to which this application is to be submitted in the RM - appContext.setQueue("default"); - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext( - Collections.<String, LocalResource>emptyMap(), - new HashMap<String, String>(), Arrays.asList("sleep", "100"), - new HashMap<String, ByteBuffer>(), null, - new HashMap<ApplicationAccessType, String>()); - appContext.setAMContainerSpec(amContainer); - appContext.setResource(Resource.newInstance(1024, 1)); - // Create the request to send to the applications manager - SubmitApplicationRequest appRequest = - Records.newRecord(SubmitApplicationRequest.class); - appRequest.setApplicationSubmissionContext(appContext); - // Submit the application to the applications manager - yarnClient.submitApplication(appContext); - - // wait for app to start - RMAppAttempt appAttempt = null; - while (true) { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - if (appReport.getYarnApplicationState() == - YarnApplicationState.ACCEPTED) { - attemptId = appReport.getCurrentApplicationAttemptId(); - appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps() - .get(attemptId.getApplicationId()).getCurrentAppAttempt(); - while (true) { - if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - break; - } - } - break; - } - } - // Just dig into the ResourceManager and get the AMRMToken just for the sake - // of testing. - UserGroupInformation.setLoginUser(UserGroupInformation - .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName())); - - // emulate RM setup of AMRM token in credentials by adding the token - // *before* setting the token service - UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken()); - appAttempt.getAMRMToken() - .setService(ClientRMProxy.getAMRMTokenService(conf)); - - // start am rm client - amClient = (AMRMClientImpl<AMRMClient.ContainerRequest>)AMRMClient - .createAMRMClient(); - - //setting an instance NMTokenCache - amClient.setNMTokenCache(new NMTokenCache()); - //asserting we are not using the singleton instance cache - Assert.assertNotSame(NMTokenCache.getSingleton(), - amClient.getNMTokenCache()); - - amClient.init(conf); - amClient.start(); - - amClient.registerApplicationMaster("Host", 10000, ""); - } - - @After - public void cancelApp() throws YarnException, IOException { - try { - amClient - .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, - null); - } finally { - if (amClient != null && - amClient.getServiceState() == Service.STATE.STARTED) { - amClient.stop(); - } - } - yarnClient.killApplication(attemptId.getApplicationId()); - attemptId = null; - } - - @AfterClass - public static void tearDown() { - if (yarnClient != null && - yarnClient.getServiceState() == Service.STATE.STARTED) { - yarnClient.stop(); - } - if (yarnCluster != null && - yarnCluster.getServiceState() == Service.STATE.STARTED) { - yarnCluster.stop(); - } - } - - @Test(timeout = 60000) - public void testPromotionFromAcquired() throws YarnException, IOException { - // setup container request - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - int oppContainersRequestedAny = - amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest - .getNumContainers(); - - assertEquals(1, oppContainersRequestedAny); - - assertEquals(1, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>(); - int iterationsLeft = 50; - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap<String, Token> receivedNMTokens = new HashMap<>(); - - updateMetrics("Before Opp Allocation"); - - while (allocatedContainerCount < oppContainersRequestedAny - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - allocatedContainerCount += - allocResponse.getAllocatedContainers().size(); - for (Container container : allocResponse.getAllocatedContainers()) { - if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - allocatedOpportContainers.put(container.getId(), container); - removeCR(container); - } - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < oppContainersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(oppContainersRequestedAny, allocatedContainerCount); - assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size()); - - updateMetrics("After Opp Allocation / Before Promotion"); - - try { - Container c = allocatedOpportContainers.values().iterator().next(); - amClient.requestContainerUpdate( - c, UpdateContainerRequest.newInstance(c.getVersion(), - c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, - null, ExecutionType.OPPORTUNISTIC)); - Assert.fail("Should throw Exception.."); - } catch (IllegalArgumentException e) { - System.out.println("## " + e.getMessage()); - Assert.assertTrue(e.getMessage().contains( - "target should be GUARANTEED and original should be OPPORTUNISTIC")); - } - - Container c = allocatedOpportContainers.values().iterator().next(); - amClient.requestContainerUpdate( - c, UpdateContainerRequest.newInstance(c.getVersion(), - c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, - null, ExecutionType.GUARANTEED)); - iterationsLeft = 120; - Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); - // do a few iterations to ensure RM is not going to send new containers - while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { - // inform RM of rejection - AllocateResponse allocResponse = amClient.allocate(0.1f); - // RM did not send new containers because AM does not need any - if (allocResponse.getUpdatedContainers() != null) { - for (UpdatedContainer updatedContainer : allocResponse - .getUpdatedContainers()) { - System.out.println("Got update.."); - updatedContainers.put(updatedContainer.getContainer().getId(), - updatedContainer); - } - } - if (iterationsLeft > 0) { - // sleep to make sure NM's heartbeat - sleep(100); - } - } - - updateMetrics("After Promotion"); - - assertEquals(1, updatedContainers.size()); - for (ContainerId cId : allocatedOpportContainers.keySet()) { - Container orig = allocatedOpportContainers.get(cId); - UpdatedContainer updatedContainer = updatedContainers.get(cId); - assertNotNull(updatedContainer); - assertEquals(ExecutionType.GUARANTEED, - updatedContainer.getContainer().getExecutionType()); - assertEquals(orig.getResource(), - updatedContainer.getContainer().getResource()); - assertEquals(orig.getNodeId(), - updatedContainer.getContainer().getNodeId()); - assertEquals(orig.getVersion() + 1, - updatedContainer.getContainer().getVersion()); - } - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - amClient.ask.clear(); - } - - @Test(timeout = 60000) - public void testDemotionFromAcquired() throws YarnException, IOException { - // setup container request - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority3)); - - int guarContainersRequestedAny = amClient.getTable(0).get(priority3, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) - .remoteRequest.getNumContainers(); - - assertEquals(1, guarContainersRequestedAny); - - assertEquals(1, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - Map<ContainerId, Container> allocatedGuarContainers = new HashMap<>(); - int iterationsLeft = 50; - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap<String, Token> receivedNMTokens = new HashMap<>(); - - updateMetrics("Before Guar Allocation"); - - while (allocatedContainerCount < guarContainersRequestedAny - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - allocatedContainerCount += - allocResponse.getAllocatedContainers().size(); - for (Container container : allocResponse.getAllocatedContainers()) { - if (container.getExecutionType() == ExecutionType.GUARANTEED) { - allocatedGuarContainers.put(container.getId(), container); - removeCR(container); - } - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < guarContainersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(guarContainersRequestedAny, allocatedContainerCount); - assertEquals(guarContainersRequestedAny, allocatedGuarContainers.size()); - - updateMetrics("After Guar Allocation / Before Demotion"); - - try { - Container c = allocatedGuarContainers.values().iterator().next(); - amClient.requestContainerUpdate( - c, UpdateContainerRequest.newInstance(c.getVersion(), - c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, - null, ExecutionType.GUARANTEED)); - Assert.fail("Should throw Exception.."); - } catch (IllegalArgumentException e) { - System.out.println("## " + e.getMessage()); - Assert.assertTrue(e.getMessage().contains( - "target should be OPPORTUNISTIC and original should be GUARANTEED")); - } - - Container c = allocatedGuarContainers.values().iterator().next(); - amClient.requestContainerUpdate( - c, UpdateContainerRequest.newInstance(c.getVersion(), - c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, - null, ExecutionType.OPPORTUNISTIC)); - iterationsLeft = 120; - Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>(); - // do a few iterations to ensure RM is not going to send new containers - while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) { - // inform RM of rejection - AllocateResponse allocResponse = amClient.allocate(0.1f); - // RM did not send new containers because AM does not need any - if (allocResponse.getUpdatedContainers() != null) { - for (UpdatedContainer updatedContainer : allocResponse - .getUpdatedContainers()) { - System.out.println("Got update.."); - updatedContainers.put(updatedContainer.getContainer().getId(), - updatedContainer); - } - } - if (iterationsLeft > 0) { - // sleep to make sure NM's heartbeat - sleep(100); - } - } - - updateMetrics("After Demotion"); - - assertEquals(1, updatedContainers.size()); - for (ContainerId cId : allocatedGuarContainers.keySet()) { - Container orig = allocatedGuarContainers.get(cId); - UpdatedContainer updatedContainer = updatedContainers.get(cId); - assertNotNull(updatedContainer); - assertEquals(ExecutionType.OPPORTUNISTIC, - updatedContainer.getContainer().getExecutionType()); - assertEquals(orig.getResource(), - updatedContainer.getContainer().getResource()); - assertEquals(orig.getNodeId(), - updatedContainer.getContainer().getNodeId()); - assertEquals(orig.getVersion() + 1, - updatedContainer.getContainer().getVersion()); - } - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - amClient.ask.clear(); - } - - @Test(timeout = 60000) - public void testMixedAllocationAndRelease() throws YarnException, - IOException { - // setup container request - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - int containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - int containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - int containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) - .remoteRequest.getNumContainers(); - int oppContainersRequestedAny = - amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest - .getNumContainers(); - - assertEquals(4, containersRequestedNode); - assertEquals(4, containersRequestedRack); - assertEquals(4, containersRequestedAny); - assertEquals(2, oppContainersRequestedAny); - - assertEquals(4, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority2, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest - .getNumContainers(); - containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) - .remoteRequest.getNumContainers(); - oppContainersRequestedAny = - amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest - .getNumContainers(); - - assertEquals(2, containersRequestedNode); - assertEquals(2, containersRequestedRack); - assertEquals(2, containersRequestedAny); - assertEquals(1, oppContainersRequestedAny); - - assertEquals(4, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - int allocatedOpportContainerCount = 0; - int iterationsLeft = 50; - Set<ContainerId> releases = new TreeSet<>(); - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap<String, Token> receivedNMTokens = new HashMap<>(); - - while (allocatedContainerCount < - containersRequestedAny + oppContainersRequestedAny - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - allocatedContainerCount += - allocResponse.getAllocatedContainers().size(); - for (Container container : allocResponse.getAllocatedContainers()) { - if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) { - allocatedOpportContainerCount++; - } - ContainerId rejectContainerId = container.getId(); - releases.add(rejectContainerId); - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < containersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(containersRequestedAny + oppContainersRequestedAny, - allocatedContainerCount); - assertEquals(oppContainersRequestedAny, allocatedOpportContainerCount); - for (ContainerId rejectContainerId : releases) { - amClient.releaseAssignedContainer(rejectContainerId); - } - assertEquals(3, amClient.release.size()); - assertEquals(0, amClient.ask.size()); - - // need to tell the AMRMClient that we don't need these resources anymore - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority)); - amClient.removeContainerRequest( - new AMRMClient.ContainerRequest(capability, nodes, racks, priority2, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - assertEquals(4, amClient.ask.size()); - - iterationsLeft = 3; - // do a few iterations to ensure RM is not going to send new containers - while (iterationsLeft-- > 0) { - // inform RM of rejection - AllocateResponse allocResponse = amClient.allocate(0.1f); - // RM did not send new containers because AM does not need any - assertEquals(0, allocResponse.getAllocatedContainers().size()); - if (allocResponse.getCompletedContainersStatuses().size() > 0) { - for (ContainerStatus cStatus : allocResponse - .getCompletedContainersStatuses()) { - if (releases.contains(cStatus.getContainerId())) { - assertEquals(cStatus.getState(), ContainerState.COMPLETE); - assertEquals(-100, cStatus.getExitStatus()); - releases.remove(cStatus.getContainerId()); - } - } - } - if (iterationsLeft > 0) { - // sleep to make sure NM's heartbeat - sleep(100); - } - } - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - } - - /** - * Tests allocation with requests comprising only opportunistic containers. - */ - @Test(timeout = 60000) - public void testOpportunisticAllocation() throws YarnException, IOException { - // setup container request - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - amClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority3, 0, - true, null, - ExecutionTypeRequest.newInstance( - ExecutionType.OPPORTUNISTIC, true))); - - int oppContainersRequestedAny = amClient.getTable(0) - .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, - profileCapability).remoteRequest.getNumContainers(); - - assertEquals(2, oppContainersRequestedAny); - - assertEquals(1, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - // RM should allocate container within 2 calls to allocate() - int allocatedContainerCount = 0; - int iterationsLeft = 10; - Set<ContainerId> releases = new TreeSet<>(); - - amClient.getNMTokenCache().clearCache(); - Assert.assertEquals(0, - amClient.getNMTokenCache().numberOfTokensInCache()); - HashMap<String, Token> receivedNMTokens = new HashMap<>(); - - while (allocatedContainerCount < oppContainersRequestedAny - && iterationsLeft-- > 0) { - AllocateResponse allocResponse = amClient.allocate(0.1f); - assertEquals(0, amClient.ask.size()); - assertEquals(0, amClient.release.size()); - - for (Container container : allocResponse.getAllocatedContainers()) { - allocatedContainerCount++; - ContainerId rejectContainerId = container.getId(); - releases.add(rejectContainerId); - } - - for (NMToken token : allocResponse.getNMTokens()) { - String nodeID = token.getNodeId().toString(); - receivedNMTokens.put(nodeID, token.getToken()); - } - - if (allocatedContainerCount < oppContainersRequestedAny) { - // sleep to let NM's heartbeat to RM and trigger allocations - sleep(100); - } - } - - assertEquals(oppContainersRequestedAny, allocatedContainerCount); - assertEquals(1, receivedNMTokens.values().size()); - } - - private void removeCR(Container container) { - List<? extends Collection<AMRMClient.ContainerRequest>> - matchingRequests = amClient.getMatchingRequests(container - .getPriority(), - ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, - container.getResource()); - Set<AMRMClient.ContainerRequest> toRemove = new HashSet<>(); - for (Collection<AMRMClient.ContainerRequest> rc : matchingRequests) { - for (AMRMClient.ContainerRequest cr : rc) { - toRemove.add(cr); - } - } - for (AMRMClient.ContainerRequest cr : toRemove) { - amClient.removeContainerRequest(cr); - } - } - - private void updateMetrics(String msg) { - AbstractYarnScheduler scheduler = - (AbstractYarnScheduler)yarnCluster.getResourceManager() - .getResourceScheduler(); - availMB = scheduler.getRootQueueMetrics().getAvailableMB(); - availVCores = scheduler.getRootQueueMetrics().getAvailableVirtualCores(); - allocMB = scheduler.getRootQueueMetrics().getAllocatedMB(); - allocVCores = scheduler.getRootQueueMetrics().getAllocatedVirtualCores(); - System.out.println("## METRICS (" + msg + ")==>"); - System.out.println(" : availMB=" + availMB + ", " + - "availVCores=" +availVCores + ", " + - "allocMB=" + allocMB + ", " + - "allocVCores=" + allocVCores + ", "); - System.out.println("<== ##"); - } - - private void sleep(int sleepTime) { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org